redis.md 67.2 KB
Newer Older
茶陵後's avatar
茶陵後 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708
# Redis Support

## Redis Support

Spring Integration 2.1 introduced support for [Redis](https://redis.io/): “an open source advanced key-value store”.
This support comes in the form of a Redis-based `MessageStore` as well as publish-subscribe messaging adapters that are supported by Redis through its [`PUBLISH`, `SUBSCRIBE`, and `UNSUBSCRIBE`](https://redis.io/topics/pubsub) commands.

You need to include this dependency into your project:

Maven

```
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-redis</artifactId>
    <version>5.5.9</version>
</dependency>
```

Gradle

```
compile "org.springframework.integration:spring-integration-redis:5.5.9"
```

You also need to include Redis client dependency, e.g. Lettuce.

To download, install, and run Redis, see the [Redis documentation](https://redis.io/download).

### Connecting to Redis

To begin interacting with Redis, you first need to connect to it.
Spring Integration uses support provided by another Spring project, [Spring Data Redis](https://github.com/SpringSource/spring-data-redis), which provides typical Spring constructs: `ConnectionFactory` and `Template`.
Those abstractions simplify integration with several Redis client Java APIs.
Currently Spring Data Redis supports [Jedis](https://github.com/xetorthio/jedis) and [Lettuce](https://lettuce.io/).

#### Using `RedisConnectionFactory`

To connect to Redis, you can use one of the implementations of the `RedisConnectionFactory` interface.
The following listing shows the interface definition:

```
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {

    /**
     * Provides a suitable connection for interacting with Redis.
     * @return connection for interacting with Redis.
     */
    RedisConnection getConnection();
}
```

The following example shows how to create a `LettuceConnectionFactory` in Java:

```
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
```

The following example shows how to create a `LettuceConnectionFactory` in Spring’s XML configuration:

```
<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379" />
</bean>
```

The implementations of `RedisConnectionFactory` provide a set of properties, such as port and host, that you can set if needed.
Once you have an instance of `RedisConnectionFactory`, you can create an instance of `RedisTemplate` and inject it with the `RedisConnectionFactory`.

#### Using `RedisTemplate`

As with other template classes in Spring (such as `JdbcTemplate` and `JmsTemplate`) `RedisTemplate` is a helper class that simplifies Redis data access code.
For more information about `RedisTemplate` and its variations (such as `StringRedisTemplate`) see the [Spring Data Redis documentation](https://docs.spring.io/spring-data/data-redis/docs/current/reference/html/).

The following example shows how to create an instance of `RedisTemplate` in Java:

```
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
```

The following example shows how to create an instance of `RedisTemplate` in Spring’s XML configuration:

```
<bean id="redisTemplate"
         class="org.springframework.data.redis.core.RedisTemplate">
    <property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
```

### Messaging with Redis

As mentioned in [the introduction](#redis), Redis provides support for publish-subscribe messaging through its `PUBLISH`, `SUBSCRIBE`, and `UNSUBSCRIBE` commands.
As with JMS and AMQP, Spring Integration provides message channels and adapters for sending and receiving messages through Redis.

#### Redis Publish/Subscribe channel

Similarly to JMS, there are cases where both the producer and consumer are intended to be part of the same application, running within the same process.
You can accomplished this by using a pair of inbound and outbound channel adapters.
However, as with Spring Integration’s JMS support, there is a simpler way to address this use case.
You can create a publish-subscribe channel, as the following example shows:

```
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>
```

A `publish-subscribe-channel` behaves much like a normal `<publish-subscribe-channel/>` element from the main Spring Integration namespace.
It can be referenced by both the `input-channel` and the `output-channel` attributes of any endpoint.
The difference is that this channel is backed by a Redis topic name: a `String` value specified by the `topic-name` attribute.
However, unlike JMS, this topic does not have to be created in advance or even auto-created by Redis.
In Redis, topics are simple `String` values that play the role of an address.
The producer and consumer can communicate by using the same `String` value as their topic name.
A simple subscription to this channel means that asynchronous publish-subscribe messaging is possible between the producing and consuming endpoints.
However, unlike the asynchronous message channels created by adding a `<queue/>` element within a simple Spring Integration `<channel/>` element, the messages are not stored in an in-memory queue.
Instead, those messages are passed through Redis, which lets you rely on its support for persistence and clustering as well as its interoperability with other non-Java platforms.

#### Redis Inbound Channel Adapter

The Redis inbound channel adapter (`RedisInboundChannelAdapter`) adapts incoming Redis messages into Spring messages in the same way as other inbound adapters.
It receives platform-specific messages (Redis in this case) and converts them to Spring messages by using a `MessageConverter` strategy.
The following example shows how to configure a Redis inbound channel adapter:

```
<int-redis:inbound-channel-adapter id="redisAdapter"
       topics="thing1, thing2"
       channel="receiveChannel"
       error-channel="testErrorChannel"
       message-converter="testConverter" />

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379" />
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />
```

The preceding example shows a simple but complete configuration of a Redis inbound channel adapter.
Note that the preceding configuration relies on the familiar Spring paradigm of auto-discovering certain beans.
In this case, the `redisConnectionFactory` is implicitly injected into the adapter.
You can specify it explicitly by using the `connection-factory` attribute instead.

Also, note that the preceding configuration injects the adapter with a custom `MessageConverter`.
The approach is similar to JMS, where `MessageConverter` instances are used to convert between Redis messages and the Spring Integration message payloads.
The default is a `SimpleMessageConverter`.

Inbound adapters can subscribe to multiple topic names, hence the comma-separated set of values in the `topics` attribute.

Since version 3.0, the inbound adapter, in addition to the existing `topics` attribute, now has the `topic-patterns` attribute.
This attribute contains a comma-separated set of Redis topic patterns.
For more information regarding Redis publish-subscribe, see [Redis Pub/Sub](https://redis.io/topics/pubsub).

Inbound adapters can use a `RedisSerializer` to deserialize the body of Redis messages.
The `serializer` attribute of the `<int-redis:inbound-channel-adapter>` can be set to an empty string, which results in a `null` value for the `RedisSerializer` property.
In this case, the raw `byte[]` bodies of Redis messages are provided as the message payloads.

Since version 5.0, you can provide an `Executor` instance to the inbound adapter by using the `task-executor` attribute of the `<int-redis:inbound-channel-adapter>`.
Also, the received Spring Integration messages now have the `RedisHeaders.MESSAGE_SOURCE` header to indicate the source of the published message: topic or pattern.
You can use this downstream for routing logic.

#### Redis Outbound Channel Adapter

The Redis outbound channel adapter adapts outgoing Spring Integration messages into Redis messages in the same way as other outbound adapters.
It receives Spring Integration messages and converts them to platform-specific messages (Redis in this case) by using a `MessageConverter` strategy.
The following example shows how to configure a Redis outbound channel adapter:

```
<int-redis:outbound-channel-adapter id="outboundAdapter"
    channel="sendChannel"
    topic="thing1"
    message-converter="testConverter"/>

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379"/>
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />
```

The configuration parallels the Redis inbound channel adapter.
The adapter is implicitly injected with a `RedisConnectionFactory`, which is defined with `redisConnectionFactory` as its bean name.
This example also includes the optional (and custom) `MessageConverter` (the `testConverter` bean).

Since Spring Integration 3.0, the `<int-redis:outbound-channel-adapter>` offers an alternative to the `topic` attribute: You can use the `topic-expression` attribute to determine the Redis topic for the message at runtime.
These attributes are mutually exclusive.

#### Redis Queue Inbound Channel Adapter

Spring Integration 3.0 introduced a queue inbound channel adapter to “pop” messages from a Redis list.
By default, it uses “right pop”, but you can configure it to use “left pop” instead.
The adapter is message-driven.
It uses an internal listener thread and does not use a poller.

The following listing shows all the available attributes for `queue-inbound-channel-adapter`:

```
<int-redis:queue-inbound-channel-adapter id=""  (1)
                    channel=""  (2)
                    auto-startup=""  (3)
                    phase=""  (4)
                    connection-factory=""  (5)
                    queue=""  (6)
                    error-channel=""  (7)
                    serializer=""  (8)
                    receive-timeout=""  (9)
                    recovery-interval=""  (10)
                    expect-message=""  (11)
                    task-executor=""  (12)
                    right-pop=""/>  (13)
```

|**1** |                              The component bean name.<br/>If you do not provide the `channel` attribute, a `DirectChannel` is created and registered in the application context with this `id` attribute as the bean name.<br/>In this case, the endpoint itself is registered with the bean name `id` plus `.adapter`.<br/>(If the bean name were `thing1`, the endpoint is registered as `thing1.adapter`.)                              |
|------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2** |                                                                                                                                                                               The `MessageChannel` to which to send `Message` instances from this Endpoint.                                                                                                                                                                                |
|**3** |                                                                                                                                      A `SmartLifecycle` attribute to specify whether this endpoint should start automatically after the application context start or not.<br/>It defaults to `true`.                                                                                                                                       |
|**4** |                                                                                                                                                                A `SmartLifecycle` attribute to specify the phase in which this endpoint is started.<br/>It defaults to `0`.                                                                                                                                                                |
|**5** |                                                                                                                                                                        A reference to a `RedisConnectionFactory` bean.<br/>It defaults to `redisConnectionFactory`.                                                                                                                                                                        |
|**6** |                                                                                                                                                                  The name of the Redis list on which the queue-based 'pop' operation is performed to get Redis messages.                                                                                                                                                                   |
|**7** |                                                                                     The `MessageChannel` to which to send `ErrorMessage` instances when exceptions are received from the listening task of the endpoint.<br/>By default, the underlying `MessagePublishingErrorHandler` uses the default `errorChannel` from the application context.                                                                                      |
|**8** |                                                                              The `RedisSerializer` bean reference.<br/>It can be an empty string, which means 'no serializer'.<br/>In this case, the raw `byte[]` from the inbound Redis message is sent to the `channel` as the `Message` payload.<br/>By default it is a `JdkSerializationRedisSerializer`.                                                                              |
|**9** |                                                                                                                                                          The timeout in milliseconds for 'pop' operation to wait for a Redis message from the queue.<br/>The default is 1 second.                                                                                                                                                          |
|**10**|                                                                                                                                              The time in milliseconds for which the listener task should sleep after exceptions on the 'pop' operation, before restarting the listener task.                                                                                                                                               |
|**11**|                                                               Specifies whether this endpoint expects data from the Redis queue to contain entire `Message` instances.<br/>If this attribute is set to `true`, the `serializer` cannot be an empty string, because messages require some form of deserialization (JDK serialization by default).<br/>Its default is `false`.                                                               |
|**12**|                                                                                                                              A reference to a Spring `TaskExecutor` (or standard JDK 1.5+ `Executor`) bean.<br/>It is used for the underlying listening task.<br/>It defaults to a `SimpleAsyncTaskExecutor`.                                                                                                                              |
|**13**|Specifies whether this endpoint should use “right pop” (when `true`) or “left pop” (when `false`) to read messages from the Redis list.<br/>If `true`, the Redis List acts as a `FIFO` queue when used with a default Redis queue outbound channel adapter.<br/>Set it to `false` to use with software that writes to the list with “right push” or to achieve a stack-like message order.<br/>Its default is `true`.<br/>Since version 4.3.|

|   |The `task-executor` has to be configured with more than one thread for processing; otherwise there is a possible deadlock when the `RedisQueueMessageDrivenEndpoint` tries to restart the listener task after an error.<br/>The `errorChannel` can be used to process those errors, to avoid restarts, but it preferable to not expose your application to the possible deadlock situation.<br/>See Spring Framework [Reference Manual](https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#scheduling-task-executor-types) for possible `TaskExecutor` implementations.|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

#### Redis Queue Outbound Channel Adapter

Spring Integration 3.0 introduced a queue outbound channel adapter to “push” to a Redis list from Spring Integration messages.
By default, it uses “left push”, but you can configure it to use “right push” instead.
The following listing shows all the available attributes for a Redis `queue-outbound-channel-adapter`:

```
<int-redis:queue-outbound-channel-adapter id=""  (1)
                    channel=""  (2)
                    connection-factory=""  (3)
                    queue=""  (4)
                    queue-expression=""  (5)
                    serializer=""  (6)
                    extract-payload=""  (7)
                    left-push=""/>  (8)
```

|**1**|                                The component bean name.<br/>If you do not provide the `channel` attribute, a `DirectChannel` is created and registered in the application context with this `id` attribute as the bean name.<br/>In this case, the endpoint is registered with a bean name of `id` plus `.adapter`.<br/>(If the bean name were `thing1`, the endpoint is registered as `thing1.adapter`.)                                 |
|-----|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2**|                                                                                                                                                                                The `MessageChannel` from which this endpoint receives `Message` instances.                                                                                                                                                                                |
|**3**|                                                                                                                                                                       A reference to a `RedisConnectionFactory` bean.<br/>It defaults to `redisConnectionFactory`.                                                                                                                                                                        |
|**4**|                                                                                                                                The name of the Redis list on which the queue-based 'push' operation is performed to send Redis messages.<br/>This attribute is mutually exclusive with `queue-expression`.                                                                                                                                |
|**5**|                                                                                                                        A SpEL `Expression` to determine the name of the Redis list.<br/>It uses the incoming `Message` at runtime as the `#root` variable.<br/>This attribute is mutually exclusive with `queue`.                                                                                                                         |
|**6**|                                                                                                              A `RedisSerializer` bean reference.<br/>It defaults to a `JdkSerializationRedisSerializer`.<br/>However, for `String` payloads, a `StringRedisSerializer` is used, if a `serializer` reference is not provided.                                                                                                              |
|**7**|                                                                                                                                                    Specifies whether this endpoint should send only the payload or the entire `Message` to the Redis queue.<br/>It defaults to `true`.                                                                                                                                                    |
|**8**|Specifies whether this endpoint should use “left push” (when `true`) or “right push” (when `false`) to write messages to the Redis list.<br/>If `true`, the Redis list acts as a `FIFO` queue when used with a default Redis queue inbound channel adapter.<br/>Set it to `false` to use with software that reads from the list with “left pop” or to achieve a stack-like message order.<br/>It defaults to `true`.<br/>Since version 4.3.|

#### Redis Application Events

Since Spring Integration 3.0, the Redis module provides an implementation of `IntegrationEvent`, which, in turn, is a `org.springframework.context.ApplicationEvent`.
The `RedisExceptionEvent` encapsulates exceptions from Redis operations (with the endpoint being the “source” of the event).
For example, the `<int-redis:queue-inbound-channel-adapter/>` emits those events after catching exceptions from the `BoundListOperations.rightPop` operation.
The exception may be any generic `org.springframework.data.redis.RedisSystemException` or a `org.springframework.data.redis.RedisConnectionFailureException`.
Handling these events with an `<int-event:inbound-channel-adapter/>` can be useful to determine problems with background Redis tasks and to take administrative actions.

### Redis Message Store

As described in the *Enterprise Integration Patterns* (EIP) book, a [message store](https://www.enterpriseintegrationpatterns.com/MessageStore.html) lets you persist messages.
This can be useful when dealing with components that have a capability to buffer messages (aggregator, resequencer, and others) when reliability is a concern.
In Spring Integration, the `MessageStore` strategy also provides the foundation for the [claim check](https://www.enterpriseintegrationpatterns.com/StoreInLibrary.html) pattern, which is described in EIP as well.

Spring Integration’s Redis module provides the `RedisMessageStore`.
The following example shows how to use it with a aggregator:

```
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
    <constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="redisMessageStore"/>
```

The preceding example is a bean configuration, and it expects a `RedisConnectionFactory` as a constructor argument.

By default, the `RedisMessageStore` uses Java serialization to serialize the message.
However, if you want to use a different serialization technique (such as JSON), you can provide your own serializer by setting the `valueSerializer` property of the `RedisMessageStore`.

Starting with version 4.3.10, the Framework provides Jackson serializer and deserializer implementations for `Message` instances and `MessageHeaders` instances — `MessageJacksonDeserializer` and `MessageHeadersJacksonSerializer`, respectively.
They have to be configured with the `SimpleModule` options for the `ObjectMapper`.
In addition, you should set `enableDefaultTyping` on the `ObjectMapper` to add type information for each serialized complex object (if you trust the source).
That type information is then used during deserialization.
The framework provides a utility method called `JacksonJsonUtils.messagingAwareMapper()`, which is already supplied with all the previously mentioned properties and serializers.
This utility method comes with the `trustedPackages` argument to limit Java packages for deserialization to avoid security vulnerabilities.
The default trusted packages: `java.util`, `java.lang`, `org.springframework.messaging.support`, `org.springframework.integration.support`, `org.springframework.integration.message`, `org.springframework.integration.store`.
To manage JSON serialization in the `RedisMessageStore`, you must configure it in a fashion similar to the following example:

```
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);
```

Starting with version 4.3.12, `RedisMessageStore` supports the `prefix` option to allow distinguishing between instances of the store on the same Redis server.

#### Redis Channel Message Stores

The `RedisMessageStore` [shown earlier](#redis-message-store) maintains each group as a value under a single key (the group ID).
While you can use this to back a `QueueChannel` for persistence, a specialized `RedisChannelMessageStore` is provided for that purpose (since version 4.0).
This store uses a `LIST` for each channel, `LPUSH` when sending messages, and `RPOP` when receiving messages.
By default, this store also uses JDK serialization, but you can modify the value serializer, as [described earlier](#redis-message-store).

We recommend using this store backing channels, instead of using the general `RedisMessageStore`.
The following example defines a Redis message store and uses it in a channel with a queue:

```
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
	<constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="redisMessageStore"/>
<int:channel>
```

The keys used to store the data have the form: `<storeBeanName>:<channelId>` (in the preceding example, `redisMessageStore:somePersistentQueueChannel`).

In addition, a subclass `RedisChannelPriorityMessageStore` is also provided.
When you use this with a `QueueChannel`, the messages are received in (FIFO) priority order.
It uses the standard `IntegrationMessageHeaderAccessor.PRIORITY` header and supports priority values (`0 - 9`).
Messages with other priorities (and messages with no priority) are retrieved in FIFO order after any messages with priority.

|   |These stores implement only `BasicMessageGroupStore` and do not implement `MessageGroupStore`.<br/>They can be used only for situations such as backing a `QueueChannel`.|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

### Redis Metadata Store

Spring Integration 3.0 introduced a new Redis-based [`MetadataStore`](https://docs.spring.io/spring-integration/docs/latest-ga/api/org/springframework/integration/metadata/MetadataStore.html) (see [Metadata Store](./meta-data-store.html#metadata-store)) implementation.
You can use the `RedisMetadataStore` to maintain the state of a `MetadataStore` across application restarts.
You can use this new `MetadataStore` implementation with adapters such as:

* [Feed](./feed.html#feed-inbound-channel-adapter)

* [File](./file.html#file-reading)

* [FTP](./ftp.html#ftp-inbound)

* [SFTP](./sftp.html#sftp-inbound)

To instruct these adapters to use the new `RedisMetadataStore`, declare a Spring bean named `metadataStore`.
The Feed inbound channel adapter and the feed inbound channel adapter both automatically pick up and use the declared `RedisMetadataStore`.
The following example shows how to declare such a bean:

```
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
    <constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
```

The `RedisMetadataStore` is backed by [`RedisProperties`](https://docs.spring.io/spring-data/data-redis/docs/current/api/org/springframework/data/redis/support/collections/RedisProperties.html).
Interaction with it uses [`BoundHashOperations`](https://docs.spring.io/spring-data/data-redis/docs/current/api/org/springframework/data/redis/core/BoundHashOperations.html), which, in turn, requires a `key` for the entire `Properties` store.
In the case of the `MetadataStore`, this `key` plays the role of a region, which is useful in a distributed environment, when several applications use the same Redis server.
By default, this `key` has a value of `MetaData`.

Starting with version 4.0, this store implements `ConcurrentMetadataStore`, letting it be reliably shared across multiple application instances where only one instance is allowed to store or modify a key’s value.

|   |You cannot use the `RedisMetadataStore.replace()` (for example, in the `AbstractPersistentAcceptOnceFileListFilter`) with a Redis cluster, since the `WATCH` command for atomicity is not currently supported.|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

### Redis Store Inbound Channel Adapter

The Redis store inbound channel adapter is a polling consumer that reads data from a Redis collection and sends it as a `Message` payload.
The following example shows how to configure a Redis store inbound channel adapter:

```
<int-redis:store-inbound-channel-adapter id="listAdapter"
    connection-factory="redisConnectionFactory"
    key="myCollection"
    channel="redisChannel"
    collection-type="LIST" >
    <int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>
```

The preceding example shows how to configure a Redis store inbound channel adapter by using the `store-inbound-channel-adapter` element, providing values for various attributes, such as:

* `key` or `key-expression`: The name of the key for the collection being used.

* `collection-type`: An enumeration of the collection types supported by this adapter.
  The supported Collections are `LIST`, `SET`, `ZSET`, `PROPERTIES`, and `MAP`.

* `connection-factory`: Reference to an instance of `o.s.data.redis.connection.RedisConnectionFactory`.

* `redis-template`: Reference to an instance of `o.s.data.redis.core.RedisTemplate`.

* Other attributes that are common across all other inbound adapters (such as 'channel').

|   |You cannot set both `redis-template` and `connection-factory`.|
|---|--------------------------------------------------------------|

|   |By default, the adapter uses a `StringRedisTemplate`.<br/>This uses `StringRedisSerializer` instances for keys, values, hash keys, and hash values.<br/>If your Redis store contains objects that are serialized with other techniques, you must supply a `RedisTemplate` configured with appropriate serializers.<br/>For example, if the store is written to using a Redis store outbound adapter that has its `extract-payload-elements` set to `false`, you must provide a `RedisTemplate` configured as follows:<br/><br/>```<br/><bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"><br/>    <property name="connectionFactory" ref="redisConnectionFactory"/><br/>    <property name="keySerializer"><br/>        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/><br/>    </property><br/>    <property name="hashKeySerializer"><br/>        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/><br/>    </property><br/></bean><br/>```<br/><br/>The `RedisTemplate` uses `String` serializers for keys and hash keys and the default JDK Serialization serializers for values and hash values.|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

Because it has a literal value for the `key`, the preceding example is relatively simple and static.
Sometimes, you may need to change the value of the key at runtime based on some condition.
To do so, use `key-expression` instead, where the provided expression can be any valid SpEL expression.

Also, you may wish to perform some post-processing on the successfully processed data that was read from the Redis collection.
For example, you may want to move or remove the value after its been processed.
You can do so by using the transaction synchronization feature that was added with Spring Integration 2.2.
The following example uses `key-expression` and transaction synchronization:

```
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
        connection-factory="redisConnectionFactory"
        key-expression="'presidents'"
        channel="otherRedisChannel"
        auto-startup="false"
        collection-type="ZSET">
            <int:poller fixed-rate="1000" max-messages-per-poll="2">
                <int:transactional synchronization-factory="syncFactory"/>
            </int:poller>
</int-redis:store-inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
	<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
```

You can declare your poller to be transactional by using a `transactional` element.
This element can reference a real transaction manager (for example, if some other part of your flow invokes JDBC).
If you do not have a “real” transaction, you can use an `o.s.i.transaction.PseudoTransactionManager`, which is an implementation of Spring’s `PlatformTransactionManager` and enables the use of the transaction synchronization features of the Redis adapter when there is no actual transaction.

|   |This does not make the Redis activities themselves transactional.<br/>It lets the synchronization of actions be taken before or after success (commit) or after failure (rollback).|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

Once your poller is transactional, you can set an instance of the `o.s.i.transaction.TransactionSynchronizationFactory` on the `transactional` element.`TransactionSynchronizationFactory` creates an instance of the `TransactionSynchronization`.
For your convenience, we have exposed a default SpEL-based `TransactionSynchronizationFactory`, which lets you configure SpEL expressions, with their execution being coordinated (synchronized) with a transaction.
Expressions for before-commit, after-commit, and after-rollback are supported, together with channels (one for each kind of event) where the evaluation result (if any) is sent.
For each child element, you can specify `expression` and `channel` attributes.
If only the `channel` attribute is present, the received message is sent there as part of the particular synchronization scenario.
If only the `expression` attribute is present and the result of an expression is a non-null value, a message with the result as the payload is generated and sent to a default channel (`NullChannel`) and appears in the logs (at the `DEBUG` level).
If you want the evaluation result to go to a specific channel, add a `channel` attribute.
If the result of an expression is null or void, no message is generated.

For more information about transaction synchronization, see [Transaction Synchronization](./transactions.html#transaction-synchronization).

### RedisStore Outbound Channel Adapter

The RedisStore outbound channel adapter lets you write a message payload to a Redis collection, as the following example shows:

```
<int-redis:store-outbound-channel-adapter id="redisListAdapter"
          collection-type="LIST"
          channel="requestChannel"
          key="myCollection" />
```

The preceding configuration a Redis store outbound channel adapter by using the `store-inbound-channel-adapter` element.
It provides values for various attributes, such as:

* `key` or `key-expression`: The name of the key for the collection being used.

* `extract-payload-elements`: If set to `true` (the default) and the payload is an instance of a “multi-value” object (that is, a `Collection` or a `Map`), it is stored by using “addAll” and “putAll” semantics.
  Otherwise, if set to `false`, the payload is stored as a single entry regardless of its type.
  If the payload is not an instance of a “multi-value” object, the value of this attribute is ignored and the payload is always stored as a single entry.

* `collection-type`: An enumeration of the `Collection` types supported by this adapter.
  The supported Collections are `LIST`, `SET`, `ZSET`, `PROPERTIES`, and `MAP`.

* `map-key-expression`: SpEL expression that returns the name of the key for the entry being stored.
  It applies only if the `collection-type` is `MAP` or `PROPERTIES` and 'extract-payload-elements' is false.

* `connection-factory`: Reference to an instance of `o.s.data.redis.connection.RedisConnectionFactory`.

* `redis-template`: Reference to an instance of `o.s.data.redis.core.RedisTemplate`.

* Other attributes that are common across all other inbound adapters (such as 'channel').

|   |You cannot set both `redis-template` and `connection-factory`.|
|---|--------------------------------------------------------------|

|   |By default, the adapter uses a `StringRedisTemplate`.<br/>This uses `StringRedisSerializer` instances for keys, values, hash keys, and hash values.<br/>However, if `extract-payload-elements` is set to `false`, a `RedisTemplate` that has `StringRedisSerializer` instances for keys and hash keys and `JdkSerializationRedisSerializer` instances s for values and hash values will be used.<br/>With the JDK serializer, it is important to understand that Java serialization is used for all values, regardless of whether the value is actually a collection or not.<br/>If you need more control over the serialization of values, consider providing your own `RedisTemplate` rather than relying upon these defaults.|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

Because it has literal values for the `key` and other attributes, the preceding example is relatively simple and static.
Sometimes, you may need to change the values dynamically at runtime based on some condition.
To do so, use their `-expression` equivalents (`key-expression`, `map-key-expression`, and so on), where the provided expression can be any valid SpEL expression.

### Redis Outbound Command Gateway

Spring Integration 4.0 introduced the Redis command gateway to let you perform any standard Redis command by using the generic `RedisConnection#execute` method.
The following listing shows the available attributes for the Redis outbound gateway:

```
<int-redis:outbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        requires-reply=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        redis-template=""  (6)
        arguments-serializer=""  (7)
        command-expression=""  (8)
        argument-expressions=""  (9)
        use-command-variable=""  (10)
        arguments-strategy="" /> (11)
```

|**1** |                                                                                                                         The `MessageChannel` from which this endpoint receives `Message` instances.                                                                                                                          |
|------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2** |                                                                                                                          The `MessageChannel` where this endpoint sends reply `Message` instances.                                                                                                                           |
|**3** |                                                                         Specifies whether this outbound gateway must return a non-null value.<br/>It defaults to `true`.<br/>A `ReplyRequiredException` is thrown when Redis returns a `null` value.                                                                         |
|**4** |                                                                                          The timeout (in milliseconds) to wait until the reply message is sent.<br/>It is typically applied for queue-based limited reply-channels.                                                                                          |
|**5** |                                                                                  A reference to a `RedisConnectionFactory` bean.<br/>It defaults to `redisConnectionFactory`.<br/>It is mutually exclusive with 'redis-template' attribute.                                                                                  |
|**6** |                                                                                                           A reference to a `RedisTemplate` bean.<br/>It is mutually exclusive with 'connection-factory' attribute.                                                                                                           |
|**7** |                                                                            A reference to an instance of `org.springframework.data.redis.serializer.RedisSerializer`.<br/>It is used to serialize each command argument to byte[], if necessary.                                                                             |
|**8** |                                                                                         The SpEL expression that returns the command key.<br/>It defaults to the `redis_command` message header.<br/>It must not evaluate to `null`.                                                                                         |
|**9** |Comma-separated SpEL expressions that are evaluated as command arguments.<br/>Mutually exclusive with the `arguments-strategy` attribute.<br/>If you provide neither attribute, the `payload` is used as the command arguments.<br/>The argument expressions can evaluate to 'null' to support a variable number of arguments.|
|**10**|              A `boolean` flag to specify whether the evaluated Redis command string is made available as the `#cmd` variable in the expression evaluation context in the `o.s.i.redis.outbound.ExpressionArgumentsStrategy` when `argument-expressions` is configured.<br/>Otherwise this attribute is ignored.              |
|**11**|                                               Reference to an instance of `o.s.i.redis.outbound.ArgumentsStrategy`.<br/>It is mutually exclusive with `argument-expressions` attribute.<br/>If you provide neither attribute, the `payload` is used as the command arguments.                                                |

You can use the `<int-redis:outbound-gateway>` as a common component to perform any desired Redis operation.
The following example shows how to get incremented values from Redis atomic number:

```
<int-redis:outbound-gateway request-channel="requestChannel"
    reply-channel="replyChannel"
    command-expression="'INCR'"/>
```

The `Message` payload should have a name of `redisCounter`, which may be provided by `org.springframework.data.redis.support.atomic.RedisAtomicInteger` bean definition.

The `RedisConnection#execute` method has a generic `Object` as its return type.
Real result depends on command type.
For example, `MGET` returns a `List<byte[]>`.
For more information about commands, their arguments and result type, see [Redis Specification](https://redis.io/commands).

### Redis Queue Outbound Gateway

Spring Integration introduced the Redis queue outbound gateway to perform request and reply scenarios.
It pushes a conversation `UUID` to the provided `queue`, pushes the value with that `UUID` as its key to a Redis list, and waits for the reply from a Redis list with a key of `UUID' plus '.reply`.
A different UUID is used for each interaction.
The following listing shows the available attributes for a Redis outbound gateway:

```
<int-redis:queue-outbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        requires-reply=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        queue=""  (6)
        order=""  (7)
        serializer=""  (8)
        extract-payload=""/>  (9)
```

|**1**|                                                                                                   The `MessageChannel` from which this endpoint receives `Message` instances.                                                                                                    |
|-----|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2**|                                                                                                    The `MessageChannel` where this endpoint sends reply `Message` instances.                                                                                                     |
|**3**|                                        Specifies whether this outbound gateway must return a non-null value.<br/>This value is `false` by default.<br/>Otherwise, a `ReplyRequiredException` is thrown when Redis returns a `null` value.                                        |
|**4**|                                                                    The timeout (in milliseconds) to wait until the reply message is sent.<br/>It is typically applied for queue-based limited reply-channels.                                                                    |
|**5**|                                                          A reference to a `RedisConnectionFactory` bean.<br/>It defaults to `redisConnectionFactory`.<br/>It is mutually exclusive with the 'redis-template' attribute.                                                          |
|**6**|                                                                                              The name of the Redis list to which the outbound gateway sends a conversation `UUID`.                                                                                               |
|**7**|                                                                                                    The order of this outbound gateway when multiple gateways are registered.                                                                                                     |
|**8**|The `RedisSerializer` bean reference.<br/>It can be an empty string, which means “no serializer”.<br/>In this case, the raw `byte[]` from the inbound Redis message is sent to the `channel` as the `Message` payload.<br/>By default, it is a `JdkSerializationRedisSerializer`. |
|**9**|Specifies whether this endpoint expects data from the Redis queue to contain entire `Message` instances.<br/>If this attribute is set to `true`, the `serializer` cannot be an empty string, because messages require some form of deserialization (JDK serialization by default).|

### Redis Queue Inbound Gateway

Spring Integration 4.1 introduced the Redis queue inbound gateway to perform request and reply scenarios.
It pops a conversation `UUID` from the provided `queue`, pops the value with that `UUID` as its key from the Redis list, and pushes the reply to the Redis list with a key of `UUID` plus `.reply`.
The following listing shows the available attributes for a Redis queue inbound gateway:

```
<int-redis:queue-inbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        executor=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        queue=""  (6)
        order=""  (7)
        serializer=""  (8)
        receive-timeout=""  (9)
        expect-message=""  (10)
        recovery-interval=""/>  (11)
```

|**1** |                                                                                                                                                                              The `MessageChannel` where this endpoint sends `Message` instances created from the Redis data.                                                                                                                                                                              |
|------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2** |                                                                                                                                                       The `MessageChannel` from where this endpoint waits for reply `Message` instances.<br/>Optional - the `replyChannel` header is still in use.                                                                                                                                                        |
|**3** |                                                                                                                                       A reference to a Spring `TaskExecutor` (or a standard JDK `Executor`) bean.<br/>It is used for the underlying listening task.<br/>It defaults to a `SimpleAsyncTaskExecutor`.                                                                                                                                       |
|**4** |                                                                                                                                                        The timeout (in milliseconds) to wait until the reply message is sent.<br/>It is typically applied for queue-based limited reply-channels.                                                                                                                                                         |
|**5** |                                                                                                                                                A reference to a `RedisConnectionFactory` bean.<br/>It defaults to `redisConnectionFactory`.<br/>It is mutually exclusive with 'redis-template' attribute.                                                                                                                                                 |
|**6** |                                                                                                                                                                                                  The name of the Redis list for the conversation `UUID`.                                                                                                                                                                                                  |
|**7** |                                                                                                                                                                                         The order of this inbound gateway when multiple gateways are registered.                                                                                                                                                                                          |
|**8** |The `RedisSerializer` bean reference.<br/>It can be an empty string, which means “no serializer”.<br/>In this case, the raw `byte[]` from the inbound Redis message is sent to the `channel` as the `Message` payload.<br/>It default to a `JdkSerializationRedisSerializer`.<br/>(Note that, in releases before version 4.3, it was a `StringRedisSerializer` by default.<br/>To restore that behavior, provide a reference to a `StringRedisSerializer`).|
|**9** |                                                                                                                                                     The timeout (in milliseconds) to wait until the receive message is fetched.<br/>It is typically applied for queue-based limited request-channels.                                                                                                                                                     |
|**10**|                                                                                    Specifies whether this endpoint expects data from the Redis queue to contain entire `Message` instances.<br/>If this attribute is set to `true`, the `serializer` cannot be an empty string, because messages require some form of deserialization (JDK serialization by default).                                                                                     |
|**11**|                                                                                                                                                       The time (in milliseconds) the listener task should sleep after exceptions on the “right pop” operation before restarting the listener task.                                                                                                                                                        |

|   |The `task-executor` has to be configured with more than one thread for processing; otherwise there is a possible deadlock when the `RedisQueueMessageDrivenEndpoint` tries to restart the listener task after an error.<br/>The `errorChannel` can be used to process those errors, to avoid restarts, but it preferable to not expose your application to the possible deadlock situation.<br/>See Spring Framework [Reference Manual](https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#scheduling-task-executor-types) for possible `TaskExecutor` implementations.|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

### Redis Stream Outbound Channel Adapter

Spring Integration 5.4 introduced Reactive Redis Stream outbound channel adapter to write Message payload into Redis stream.
Outbound Channel adapter uses `ReactiveStreamOperations.add(…​)` to add a `Record` to the stream.
The following example shows how to use Java configuration and Service class for Redis Stream Outbound Channel Adapter.

```
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
        ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
    ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
        new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
    reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
    reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
    reactiveStreamMessageHandler.setExtractPayload(true); (4)
    return reactiveStreamMessageHandler;
}
```

|**1**|Construct an instance of `ReactiveRedisStreamMessageHandler` using `ReactiveRedisConnectionFactory` and stream name to add records.<br/>Another constructor variant is based on a SpEL expression to evaluate a stream key against a request message.|
|-----|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2**|                                                                       Set a `RedisSerializationContext` used to serialize a record key and value before adding to the stream.                                                                       |
|**3**|                                                                                 Set `HashMapper` which provides contract between Java types and Redis hashes/maps.                                                                                  |
|**4**|                                       If 'true', channel adapter will extract payload from a request message for a stream record to add.<br/>Or use the whole message as a value.<br/>It defaults to `true`.                                        |

### Redis Stream Inbound Channel Adapter

Spring Integration 5.4 introduced the Reactive Stream inbound channel adapter for reading messages from a Redis Stream.
Inbound channel adapter uses `StreamReceiver.receive(…​)` or `StreamReceiver.receiveAutoAck()` based on auto acknowledgement flag to read record from Redis stream.
The following example shows how to use Java configuration for Redis Stream Inbound Channel Adapter.

```
@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
       ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
            new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
    messageProducer.setStreamReceiverOptions( (2)
                StreamReceiver.StreamReceiverOptions.builder()
                      .pollTimeout(Duration.ofMillis(100))
                      .build());
    messageProducer.setAutoStartup(true); (3)
    messageProducer.setAutoAck(false); (4)
    messageProducer.setCreateConsumerGroup(true); (5)
    messageProducer.setConsumerGroup("my-group"); (6)
    messageProducer.setConsumerName("my-consumer"); (7)
    messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
    messageProducer.setReadOffset(ReadOffset.latest()); (9)
    messageProducer.extractPayload(true); (10)
    return messageProducer;
}
```

|**1** |                                                               Construct an instance of `ReactiveRedisStreamMessageProducer` using `ReactiveRedisConnectionFactory` and stream key to read records.                                                               |
|------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2** |                                                                                 A `StreamReceiver.StreamReceiverOptions` to consume redis stream using reactive infrastructure.                                                                                  |
|**3** |A `SmartLifecycle` attribute to specify whether this endpoint should start automatically after the application context start or not.<br/>It defaults to `true`.<br/>If `false`, `RedisStreamMessageProducer` should be started manually `messageProducer.start()`.|
|**4** |                                           If `false`, received messages are not auto acknowledged.<br/>The acknowledgement of the message will be deferred to the client consuming message.<br/>It defaults to `true`.                                           |
|**5** |           If `true`, a consumer group will be created.<br/>During creation of consumer group stream will be created (if not exists yet), too.<br/>Consumer group track message delivery and distinguish between consumers.<br/>It defaults to `false`.           |
|**6** |                                                                                                Set Consumer Group name.<br/>It defaults to the defined bean name.                                                                                                |
|**7** |                                                                                           Set Consumer name.<br/>Reads message as `my-consumer` from group `my-group`.                                                                                           |
|**8** |                                                                                                The message channel to which to send messages from this endpoint.                                                                                                 |
|**9** |                                                                                           Define the offset to read message.<br/>It defaults to `ReadOffset.latest()`.                                                                                           |
|**10**|                                                    If 'true', channel adapter will extract payload value from the `Record`.<br/>Otherwise the whole `Record` is used as a payload.<br/>It defaults to `true`.                                                    |

If the `autoAck` is set to `false`, the `Record` in Redis Stream is not acknowledge automatically by the Redis driver, instead an `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` header is added into a message to produce with a `SimpleAcknowledgment` instance as a value.
It is a target integration flow responsibility to call its `acknowledge()` callback whenever the business logic is done for the message based on such a record.
Similar logic is required even when an exception happens during deserialization and `errorChannel` is configured.
So, target error handler must decided to ack or nack such a failed message.
Alongside with `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK`, the `ReactiveRedisStreamMessageProducer` also populates these headers into the message to produce: `RedisHeaders.STREAM_KEY`, `RedisHeaders.STREAM_MESSAGE_ID`, `RedisHeaders.CONSUMER_GROUP` and `RedisHeaders.CONSUMER`.

Starting with version 5.5, you can configure `StreamReceiver.StreamReceiverOptionsBuilder` options explicitly on the `ReactiveRedisStreamMessageProducer`, including the newly introduced `onErrorResume` function, which is required if the Redis Stream consumer should continue polling when deserialization errors occur.
The default function sends a message to the error channel (if provided) with possible acknowledgement for the failed message as it is described above.
All these `StreamReceiver.StreamReceiverOptionsBuilder` are mutually exclusive with an externally provided `StreamReceiver.StreamReceiverOptions`.

### Redis Lock Registry

Spring Integration 4.0 introduced the `RedisLockRegistry`.
Certain components (for example, aggregator and resequencer) use a lock obtained from a `LockRegistry` instance to ensure that only one thread manipulates a group at a time.
The `DefaultLockRegistry` performs this function within a single component.
You can now configure an external lock registry on these components.
When you use it with a shared `MessageGroupStore`, you can use the `RedisLockRegistry` to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.

When a lock is released by a local thread, another local thread can generally acquire the lock immediately.
If a lock is released by a thread using a different registry instance, it can take up to 100ms to acquire the lock.

To avoid “hung” locks (when a server fails), the locks in this registry are expired after a default 60 seconds, but you can configure this value on the registry.
Locks are normally held for a much smaller time.

|   |Because the keys can expire, an attempt to unlock an expired lock results in an exception being thrown.<br/>However, the resources protected by such a lock may have been compromised, so such exceptions should be considered to be severe.<br/>You should set the expiry at a large enough value to prevent this condition, but set it low enough that the lock can be recovered after a server failure in a reasonable amount of time.|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

Starting with version 5.0, the `RedisLockRegistry` implements `ExpirableLockRegistry`, which removes locks last acquired more than `age` ago and that are not currently locked.

String with version 5.5.6, the `RedisLockRegistry` is support automatically clean up cache for redisLocks in `RedisLockRegistry.locks` via `RedisLockRegistry.setCacheCapacity()`.
See its JavaDocs for more information.