The Spring for Apache Kafka project applies core Spring concepts to the development of Kafka-based messaging solutions.
We provide a “template” as a high-level abstraction for sending messages.
We also provide support for Message-driven POJOs.
## [](#whats-new-part)2. What’s new?
## 2. What’s new?
### [](#spring-kafka-intro-new)2.1. What’s New in 2.8 Since 2.7
### 2.1. What’s New in 2.8 Since 2.7
This section covers the changes made from version 2.7 to version 2.8.
For changes in earlier version, see [[history]](#history).
#### [](#x28-kafka-client)2.1.1. Kafka Client Version
#### 2.1.1. Kafka Client Version
This version requires the 3.0.0 `kafka-clients`
...
...
@@ -22,7 +22,7 @@ This version requires the 3.0.0 `kafka-clients`
See [Exactly Once Semantics](#exactly-once) and [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics) for more information.
#### [](#x28-packages)2.1.2. Package Changes
#### 2.1.2. Package Changes
Classes and interfaces related to type mapping have been moved from `…support.converter` to `…support.mapping`.
...
...
@@ -34,13 +34,13 @@ Classes and interfaces related to type mapping have been moved from `…suppo
*`Jackson2JavaTypeMapper`
#### [](#x28-ooo-commits)2.1.3. Out of Order Manual Commits
#### 2.1.3. Out of Order Manual Commits
The listener container can now be configured to accept manual offset commits out of order (usually asynchronously).
The container will defer the commit until the missing offset is acknowledged.
See [Manually Committing Offsets](#ooo-commits) for more information.
You can now receive a single record, given the topic, partition and offset.
See [Using `KafkaTemplate` to Receive](#kafka-template-receive) for more information.
#### [](#x28-eh)2.1.6. `CommonErrorHandler` Added
#### 2.1.6. `CommonErrorHandler` Added
The legacy `GenericErrorHandler` and its sub-interface hierarchies for record an batch listeners have been replaced by a new single interface `CommonErrorHandler` with implementations corresponding to most legacy implementations of `GenericErrorHandler`.
See [Container Error Handlers](#error-handlers) for more information.
Now you can use the same factory for retryable and non-retryable topics.
See [Specifying a ListenerContainerFactory](#retry-topic-lcf) for more information.
...
...
@@ -95,11 +95,11 @@ Refer to [Exception Classifier](#retry-topic-ex-classifier) to see how to manage
The KafkaBackOffException thrown when using the retryable topics feature is now logged at DEBUG level.
See [[change-kboe-logging-level]](#change-kboe-logging-level) if you need to change the logging level back to WARN or set it to any other level.
## [](#introduction)3. Introduction
## 3. Introduction
This first part of the reference documentation is a high-level overview of Spring for Apache Kafka and the underlying concepts and some code snippets that can help you get up and running as quickly as possible.
### [](#quick-tour)3.1. Quick Tour
### 3.1. Quick Tour
Prerequisites: You must install and run Apache Kafka.
Then you must put the Spring for Apache Kafka (`spring-kafka`) JAR and all of its dependencies on your class path.
However, the quickest way to get started is to use [start.spring.io](https://start.spring.io)(or the wizards in Spring Tool Suits and Intellij IDEA) and create a project, selecting 'Spring for Apache Kafka' as a dependency.
#### [](#compatibility)3.1.1. Compatibility
#### 3.1.1. Compatibility
This quick tour works with the following versions:
...
...
@@ -153,14 +153,14 @@ This quick tour works with the following versions:
* Minimum Java version: 8
#### [](#getting-started)3.1.2. Getting Started
#### 3.1.2. Getting Started
The simplest way to get started is to use [start.spring.io](https://start.spring.io)(or the wizards in Spring Tool Suits and Intellij IDEA) and create a project, selecting 'Spring for Apache Kafka' as a dependency.
Refer to the [Spring Boot documentation](https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-kafka) for more information about its opinionated auto configuration of the infrastructure beans.
##### [](#with-java-configuration-no-spring-boot)With Java Configuration (No Spring Boot)
#####
| |Spring for Apache Kafka is designed to be used in a Spring Application Context.<br/>For example, if you create the listener container yourself outside of a Spring context, not all functions will work unless you satisfy all of the `…Aware` interfaces that the container implements.|
As you can see, you have to define several infrastructure beans when not using Spring Boot.
## [](#reference)4. Reference
## 4. Reference
This part of the reference documentation details the various components that comprise Spring for Apache Kafka.
The [main chapter](#kafka) covers the core classes to develop a Kafka application with Spring.
### [](#kafka)4.1. Using Spring for Apache Kafka
### 4.1. Using Spring for Apache Kafka
This section offers detailed explanations of the various concerns that impact using Spring for Apache Kafka.
For a quick but less detailed introduction, see [Quick Tour](#quick-tour).
#### [](#connecting)4.1.1. Connecting to Kafka
#### 4.1.1. Connecting to Kafka
*`KafkaAdmin` - see [Configuring Topics](#configuring-topics)
...
...
@@ -467,7 +467,7 @@ When using `@KafkaListener` s, `stop()` and `start()` the `KafkaListenerEndpoint
See the Javadocs for more information.
##### [](#factory-listeners)Factory Listeners
##### Factory Listeners
Starting with version 2.5, the `DefaultKafkaProducerFactory` and `DefaultKafkaConsumerFactory` can be configured with a `Listener` to receive notifications whenever a producer or consumer is created or closed.
...
...
@@ -505,7 +505,7 @@ These listeners can be used, for example, to create and bind a Micrometer `Kafka
The framework provides listeners that do exactly that; see [Micrometer Native Metrics](#micrometer-native).
If you define a `KafkaAdmin` bean in your application context, it can automatically add topics to the broker.
To do so, you can add a `NewTopic``@Bean` for each topic to the application context.
...
...
@@ -689,15 +689,15 @@ private KafkaAdmin admin;
client.close();
```
#### [](#sending-messages)4.1.3. Sending Messages
#### 4.1.3. Sending Messages
This section covers how to send messages.
##### [](#kafka-template)Using `KafkaTemplate`
##### Using `KafkaTemplate`
This section covers how to use `KafkaTemplate` to send messages.
###### [](#overview)Overview
###### Overview
The `KafkaTemplate` wraps a producer and provides convenience methods to send data to Kafka topics.
The following listing shows the relevant methods from `KafkaTemplate`:
...
...
@@ -890,7 +890,7 @@ If you wish to block the sending thread to await the result, you can invoke the
You may wish to invoke `flush()` before waiting or, for convenience, the template has a constructor with an `autoFlush` parameter that causes the template to `flush()` on each send.
Flushing is only needed if you have set the `linger.ms` producer property and want to immediately send a partial batch.
###### [](#examples)Examples
###### Examples
This section shows examples of sending messages to Kafka:
...
...
@@ -938,7 +938,7 @@ public void sendToKafka(final MyOutputData data) {
Note that the cause of the `ExecutionException` is `KafkaProducerException` with the `failedProducerRecord` property.
Starting with version 2.5, you can use a `RoutingKafkaTemplate` to select the producer at runtime, based on the destination `topic` name.
...
...
@@ -989,7 +989,7 @@ The corresponding `@KafkaListener` s for this example are shown in [Annotation P
For another technique to achieve similar results, but with the additional capability of sending different types to the same topic, see [Delegating Serializer and Deserializer](#delegating-serialization).
Starting with version 2.8, if you provide serializers as objects (in the constructor or via the setters), the factory will invoke the `configure()` method to configure them with the configuration properties.
Version 2.1.3 introduced a subclass of `KafkaTemplate` to provide request/reply semantics.
The class is named `ReplyingKafkaTemplate` and has two additional methods; the following shows the method signatures:
...
...
@@ -1248,7 +1248,7 @@ These header names are used by the `@KafkaListener` infrastructure to route the
Starting with version 2.3, you can customize the header names - the template has 3 properties `correlationHeaderName`, `replyTopicHeaderName`, and `replyPartitionHeaderName`.
This is useful if your server is not a Spring application (or does not use the `@KafkaListener`).
###### [](#exchanging-messages)Request/Reply with `Message<?>` s
###### Request/Reply with `Message<?>` s
Version 2.7 added methods to the `ReplyingKafkaTemplate` to send and receive `spring-messaging` 's `Message<?>` abstraction:
...
...
@@ -1343,7 +1343,7 @@ val things = future2?.get(10, TimeUnit.SECONDS)?.payload
The template in [Using `ReplyingKafkaTemplate`](#replying-template) is strictly for a single request/reply scenario.
For cases where multiple receivers of a single message return a reply, you can use the `AggregatingReplyingKafkaTemplate`.
...
...
@@ -1429,11 +1429,11 @@ The real `ConsumerRecord` s in the `Collection` contain the actual topic(s) from
| |If you use an [`ErrorHandlingDeserializer`](#error-handling-deserializer) with this aggregating template, the framework will not automatically detect `DeserializationException` s.<br/>Instead, the record (with a `null` value) will be returned intact, with the deserialization exception(s) in headers.<br/>It is recommended that applications call the utility method `ReplyingKafkaTemplate.checkDeserialization()` method to determine if a deserialization exception occurred.<br/>See its javadocs for more information.<br/>The `replyErrorChecker` is also not called for this aggregating template; you should perform the checks on each element of the reply.|
You can receive messages by configuring a `MessageListenerContainer` and providing a message listener or by using the `@KafkaListener` annotation.
##### [](#message-listeners)Message Listeners
##### Message Listeners
When you use a [message listener container](#message-listener-container), you must provide a listener to receive data.
There are currently eight supported interfaces for message listeners.
...
...
@@ -1505,7 +1505,7 @@ public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends Ba
| |You should not execute any `Consumer<?, ?>` methods that affect the consumer’s positions and or committed offsets in your listener; the container needs to manage such information.|
@@ -1615,7 +1615,7 @@ Defining `authExceptionRetryInterval` allows the container to recover when prope
Starting with version 2.8, when creating the consumer factory, if you provide deserializers as objects (in the constructor or via the setters), the factory will invoke the `configure()` method to configure them with the configuration properties.
The single constructor is similar to the `KafkaListenerContainer` constructor.
The following listing shows the constructor’s signature:
...
...
@@ -1649,7 +1649,7 @@ The metrics are grouped into the `Map<MetricName, ? extends Metric>` by the `cli
Starting with version 2.3, the `ContainerProperties` provides an `idleBetweenPolls` option to let the main loop in the listener container to sleep between `KafkaConsumer.poll()` calls.
An actual sleep interval is selected as the minimum from the provided option and difference between the `max.poll.interval.ms` consumer config and the current records batch processing time.
###### [](#committing-offsets)Committing Offsets
###### Committing Offsets
Several options are provided for committing offsets.
If the `enable.auto.commit` consumer property is `true`, Kafka auto-commits the offsets according to its configuration.
...
...
@@ -1722,14 +1722,14 @@ See [Container Error Handlers](#error-handlers) for more information.
| |When using partition assignment via group management, it is important to ensure the `sleep` argument (plus the time spent processing records from the previous poll) is less than the consumer `max.poll.interval.ms` property.|
###### [](#container-auto-startup)Listener Container Auto Startup
###### Listener Container Auto Startup
The listener containers implement `SmartLifecycle`, and `autoStartup` is `true` by default.
The containers are started in a late phase (`Integer.MAX-VALUE - 100`).
Other components that implement `SmartLifecycle`, to handle data from listeners, should be started in an earlier phase.
The `- 100` leaves room for later phases to enable components to be auto-started after the containers.
##### [](#ooo-commits)Manually Committing Offsets
##### Manually Committing Offsets
Normally, when using `AckMode.MANUAL` or `AckMode.MANUAL_IMMEDIATE`, the acknowledgments must be acknowledged in order, because Kafka does not maintain state for each record, only a committed offset for each group/partition.
Starting with version 2.8, you can now set the container property `asyncAcks`, which allows the acknowledgments for records returned by the poll to be acknowledged in any order.
...
...
@@ -1739,7 +1739,7 @@ The consumer will be paused (no new records delivered) until all the offsets for
| |While this feature allows applications to process records asynchronously, it should be understood that it increases the possibility of duplicate deliveries after a failure.|
The `@KafkaListener` annotation is used to designate a bean method as a listener for a listener container.
The bean is wrapped in a `MessagingMessageListenerAdapter` configured with various features, such as converters to convert the data, if necessary, to match the method parameters.
...
...
@@ -1747,7 +1747,7 @@ The bean is wrapped in a `MessagingMessageListenerAdapter` configured with vario
You can configure most attributes on the annotation with SpEL by using `#{…}` or property placeholders (`${…}`).
See the [Javadoc](https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html) for more information.
###### [](#record-listener)Record Listeners
###### Record Listeners
The `@KafkaListener` annotation provides a mechanism for simple POJO listeners.
The following example shows how to use it:
...
...
@@ -1816,7 +1816,7 @@ public void listen(String data) {
This contains all the data from the `ConsumerRecord` except the key and value.
###### [](#batch-listeners)Batch Listeners
###### Batch Listeners
Starting with version 1.1, you can configure `@KafkaListener` methods to receive the entire batch of consumer records received from the consumer poll.
To configure the listener container factory to create batch listeners, you can set the `batchListener` property.
...
...
@@ -2037,7 +2037,7 @@ public void pollResults(ConsumerRecords<?, ?> records) {
| |If the container factory has a `RecordFilterStrategy` configured, it is ignored for `ConsumerRecords<?, ?>` listeners, with a `WARN` log message emitted.<br/>Records can only be filtered with a batch listener if the `<List<?>>` form of listener is used.<br/>By default, records are filtered one-at-a-time; starting with version 2.8, you can override `filterBatch` to filter the entire batch in one call.|
Starting with version 2.0, the `id` property (if present) is used as the Kafka consumer `group.id` property, overriding the configured property in the consumer factory, if present.
You can also set `groupId` explicitly or set `idIsGroup` to false to restore the previous behavior of using the consumer factory `group.id`.
...
...
@@ -2126,7 +2126,7 @@ public void listen2(byte[] in) {
}
```
##### [](#listener-group-id)Obtaining the Consumer `group.id`
##### Obtaining the Consumer `group.id`
When running the same listener code in multiple containers, it may be useful to be able to determine which container (identified by its `group.id` consumer property) that a record came from.
...
...
@@ -2144,7 +2144,7 @@ public void listener(@Payload String foo,
| |This is available in record listeners and batch listeners that receive a `List<?>` of records.<br/>It is **not** available in a batch listener that receives a `ConsumerRecords<?, ?>` argument.<br/>Use the `KafkaUtils` mechanism in that case.|
Listener containers currently use two task executors, one to invoke the consumer and another that is used to invoke the listener when the kafka consumer property `enable.auto.commit` is `false`.
You can provide custom executors by setting the `consumerExecutor` and `listenerExecutor` properties of the container’s `ContainerProperties`.
...
...
@@ -2156,7 +2156,7 @@ This executor creates threads with names similar to `<beanName>-C-1` (consumer t
For the `ConcurrentMessageListenerContainer`, the `<beanName>` part of the thread name becomes `<beanName>-m`, where `m` represents the consumer instance.`n` increments each time the container is started.
So, with a bean name of `container`, threads in this container will be named `container-0-C-1`, `container-1-C-1` etc., after the container is started the first time; `container-0-C-2`, `container-1-C-2` etc., after a stop and subsequent start.
##### [](#kafka-listener-meta)`@KafkaListener` as a Meta Annotation
##### `@KafkaListener` as a Meta Annotation
Starting with version 2.2, you can now use `@KafkaListener` as a meta annotation.
The following example shows how to do so:
...
...
@@ -2189,7 +2189,7 @@ public void listen1(String in) {
}
```
##### [](#class-level-kafkalistener)`@KafkaListener` on a Class
##### `@KafkaListener` on a Class
When you use `@KafkaListener` at the class-level, you must specify `@KafkaHandler` at the method level.
When messages are delivered, the converted message payload type is used to determine which method to call.
...
...
@@ -2247,7 +2247,7 @@ void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetad
Starting with version 2.7.2, you can now programmatically modify annotation attributes before the container is created.
To do so, add one or more `KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer` to the application context.`AnnotationEnhancer` is a `BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>` and must return a map of attributes.
...
...
@@ -2272,7 +2272,7 @@ public static AnnotationEnhancer groupIdEnhancer() {
The listener containers created for `@KafkaListener` annotations are not beans in the application context.
Instead, they are registered with an infrastructure bean of type `KafkaListenerEndpointRegistry`.
...
...
@@ -2307,7 +2307,7 @@ A collection of managed containers can be obtained by calling the registry’s `
Version 2.2.5 added a convenience method `getAllListenerContainers()`, which returns a collection of all containers, including those managed by the registry and those declared as beans.
The collection returned will include any prototype beans that have been initialized, but it will not initialize any lazy bean declarations.
`ContainerProperties` has a property called `consumerRebalanceListener`, which takes an implementation of the Kafka client’s `ConsumerRebalanceListener` interface.
If this property is not provided, the container configures a logging listener that logs rebalance events at the `INFO` level.
| |Starting with version 2.4, a new method `onPartitionsLost()` has been added (similar to a method with the same name in `ConsumerRebalanceLister`).<br/>The default implementation on `ConsumerRebalanceLister` simply calls `onPartionsRevoked`.<br/>The default implementation on `ConsumerAwareRebalanceListener` does nothing.<br/>When supplying the listener container with a custom listener (of either type), it is important that your implementation not call `onPartitionsRevoked` from `onPartitionsLost`.<br/>If you implement `ConsumerRebalanceListener` you should override the default method.<br/>This is because the listener container will call its own `onPartitionsRevoked` from its implementation of `onPartitionsLost` after calling the method on your implementation.<br/>If you implementation delegates to the default behavior, `onPartitionsRevoked` will be called twice each time the `Consumer` calls that method on the container’s listener.|
##### [](#annotation-send-to)Forwarding Listener Results using `@SendTo`
##### Forwarding Listener Results using `@SendTo`
Starting with version 2.0, if you also annotate a `@KafkaListener` with a `@SendTo` annotation and the method invocation returns a result, the result is forwarded to the topic specified by the `@SendTo`.
...
...
@@ -2580,7 +2580,7 @@ When using request/reply semantics, the target partition can be requested by the
| |If a listener method returns an `Iterable`, by default a record for each element as the value is sent.<br/>Starting with version 2.3.5, set the `splitIterables` property on `@KafkaListener` to `false` and the entire result will be sent as the value of a single `ProducerRecord`.<br/>This requires a suitable serializer in the reply template’s producer configuration.<br/>However, if the reply is `Iterable<Message<?>>` the property is ignored and each message is sent separately.|
In certain scenarios, such as rebalancing, a message that has already been processed may be redelivered.
The framework cannot know whether such a message has been processed or not.
...
...
@@ -2599,11 +2599,11 @@ In addition, a `FilteringBatchMessageListenerAdapter` is provided, for when you
| |The `FilteringBatchMessageListenerAdapter` is ignored if your `@KafkaListener` receives a `ConsumerRecords<?, ?>` instead of `List<ConsumerRecord<?,?>>`, because `ConsumerRecords` is immutable.|
See the `DefaultErrorHandler` in [Handling Exceptions](#annotation-error-handling).
##### [](#sequencing)Starting `@KafkaListener` s in Sequence
##### Starting `@KafkaListener` s in Sequence
A common use case is to start a listener after another listener has consumed all the records in a topic.
For example, you may want to load the contents of one or more compacted topics into memory before processing records from other topics.
...
...
@@ -2652,7 +2652,7 @@ As an aside; previously, containers in each group were added to a bean of type `
These collections are now deprecated in favor of beans of type `ContainerGroup` with a bean name that is the group name, suffixed with `.group`; in the example above, there would be 2 beans `g1.group` and `g2.group`.
The `Collection` beans will be removed in a future release.
##### [](#kafka-template-receive)Using `KafkaTemplate` to Receive
##### Using `KafkaTemplate` to Receive
This section covers how to use `KafkaTemplate` to receive messages.
...
...
@@ -2673,87 +2673,87 @@ As you can see, you need to know the partition and offset of the record(s) you n
With the last two methods, each record is retrieved individually and the results assembled into a `ConsumerRecords` object.
When creating the `TopicPartitionOffset` s for the request, only positive, absolute offsets are supported.
| []()[`ackCount`](#ackCount) | 1 | The number of records before committing pending offsets when the `ackMode` is `COUNT` or `COUNT_TIME`. |
| []()[`adviceChain`](#adviceChain) | `null` | A chain of `Advice` objects (e.g. `MethodInterceptor` around advice) wrapping the message listener, invoked in order. |
| []()[`ackMode`](#ackMode) | BATCH | Controls how often offsets are committed - see [Committing Offsets](#committing-offsets). |
| []()[`ackOnError`](#ackOnError) | `false` | [DEPRECATED in favor of `ErrorHandler.isAckAfterHandle()`] |
| []()[`ackTime`](#ackTime) | 5000 | The time in milliseconds after which pending offsets are committed when the `ackMode` is `TIME` or `COUNT_TIME`. |
| []()[`assignmentCommitOption`](#assignmentCommitOption) | LATEST\_ONLY \_NO\_TX | Whether or not to commit the initial position on assignment; by default, the initial offset will only be committed if the `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` is `latest` and it won’t run in a transaction even if there is a transaction manager present.<br/>See the javadocs for `ContainerProperties.AssignmentCommitOption` for more information about the available options. |
|[]()[`authExceptionRetryInterval`](#authExceptionRetryInterval)| `null` | When not null, a `Duration` to sleep between polls when an `AuthenticationException` or `AuthorizationException` is thrown by the Kafka client.<br/>When null, such exceptions are considered fatal and the container will stop. |
| []()[`clientId`](#clientId) | (empty string) | A prefix for the `client.id` consumer property.<br/>Overrides the consumer factory `client.id` property; in a concurrent container, `-n` is added as a suffix for each consumer instance. |
| []()[`checkDeserExWhenKeyNull`](#checkDeserExWhenKeyNull) | false | Set to `true` to always check for a `DeserializationException` header when a `null` `key` is received.<br/>Useful when the consumer code cannot determine that an `ErrorHandlingDeserializer` has been configured, such as when using a delegating deserializer. |
| []()[`checkDeserExWhenValueNull`](#checkDeserExWhenValueNull) | false | Set to `true` to always check for a `DeserializationException` header when a `null` `value` is received.<br/>Useful when the consumer code cannot determine that an `ErrorHandlingDeserializer` has been configured, such as when using a delegating deserializer. |
| []()[`commitCallback`](#commitCallback) | `null` | When present and `syncCommits` is `false` a callback invoked after the commit completes. |
| []()[`commitLogLevel`](#commitLogLevel) | DEBUG | The logging level for logs pertaining to committing offsets. |
| []()[`consumerRebalanceListener`](#consumerRebalanceListener) | `null` | A rebalance listener; see [Rebalancing Listeners](#rebalance-listeners). |
| []()[`consumerStartTimout`](#consumerStartTimout) | 30s | The time to wait for the consumer to start before logging an error; this might happen if, say, you use a task executor with insufficient threads. |
| []()[`consumerTaskExecutor`](#consumerTaskExecutor) |`SimpleAsyncTaskExecutor`| A task executor to run the consumer threads.<br/>The default executor creates threads named `<name>-C-n`; with the `KafkaMessageListenerContainer`, the name is the bean name; with the `ConcurrentMessageListenerContainer` the name is the bean name suffixed with `-n` where n is incremented for each child container. |
| []()[`deliveryAttemptHeader`](#deliveryAttemptHeader) | `false` | See [Delivery Attempts Header](#delivery-header). |
| []()[`eosMode`](#eosMode) | `V2` | Exactly Once Semantics mode; see [Exactly Once Semantics](#exactly-once). |
| []()[`fixTxOffsets`](#fixTxOffsets) | `false` |When consuming records produced by a transactional producer, and the consumer is positioned at the end of a partition, the lag can incorrectly be reported as greater than zero, due to the pseudo record used to indicate transaction commit/rollback and, possibly, the presence of rolled-back records.<br/>This does not functionally affect the consumer but some users have expressed concern that the "lag" is non-zero.<br/>Set this property to `true` and the container will correct such mis-reported offsets.<br/>The check is performed before the next poll to avoid adding significant complexity to the commit processing.<br/>At the time of writing, the lag will only be corrected if the consumer is configured with `isolation.level=read_committed` and `max.poll.records` is greater than 1.<br/>See [KAFKA-10683](https://issues.apache.org/jira/browse/KAFKA-10683) for more information.|
| []()[`groupId`](#groupId) | `null` | Overrides the consumer `group.id` property; automatically set by the `@KafkaListener` `id` or `groupId` property. |
| []()[`idleBeforeDataMultiplier`](#idleBeforeDataMultiplier) | 5.0 | Multiplier for `idleEventInterval` that is applied before any records are received.<br/>After a record is received, the multiplier is no longer applied.<br/>Available since version 2.8. |
| []()[`idleBetweenPolls`](#idleBetweenPolls) | 0 | Used to slow down deliveries by sleeping the thread between polls.<br/>The time to process a batch of records plus this value must be less than the `max.poll.interval.ms` consumer property. |
| []()[`idleEventInterval`](#idleEventInterval) | `null` | When set, enables publication of `ListenerContainerIdleEvent` s, see [Application Events](#events) and [Detecting Idle and Non-Responsive Consumers](#idle-containers).<br/>Also see `idleBeforeDataMultiplier`. |
|[]()[`idlePartitionEventInterval`](#idlePartitionEventInterval)| `null` | When set, enables publication of `ListenerContainerIdlePartitionEvent` s, see [Application Events](#events) and [Detecting Idle and Non-Responsive Consumers](#idle-containers). |
| []()[`kafkaConsumerProperties`](#kafkaConsumerProperties) | None | Used to override any arbitrary consumer properties configured on the consumer factory. |
| []()[`logContainerConfig`](#logContainerConfig) | `false` | Set to true to log at INFO level all container properties. |
| []()[`messageListener`](#messageListener) | `null` | The message listener. |
| []()[`micrometerEnabled`](#micrometerEnabled) | `true` | Whether or not to maintain Micrometer timers for the consumer threads. |
| []()[`missingTopicsFatal`](#missingTopicsFatal) | `false` | When true prevents the container from starting if the confifgured topic(s) are not present on the broker. |
| []()[`monitorInterval`](#monitorInterval) | 30s | How often to check the state of the consumer threads for `NonResponsiveConsumerEvent` s.<br/>See `noPollThreshold` and `pollTimeout`. |
| []()[`noPollThreshold`](#noPollThreshold) | 3.0 | Multiplied by `pollTimeOut` to determine whether to publish a `NonResponsiveConsumerEvent`.<br/>See `monitorInterval`. |
| []()[`onlyLogRecordMetadata`](#onlyLogRecordMetadata) | `false` | Set to false to log the complete consumer record (in error, debug logs etc) instead of just `[[email protected]](/cdn-cgi/l/email-protection)`. |
| []()[`pollTimeout`](#pollTimeout) | 5000 | The timeout passed into `Consumer.poll()`. |
| []()[`scheduler`](#scheduler) |`ThreadPoolTaskScheduler`| A scheduler on which to run the consumer monitor task. |
| []()[`shutdownTimeout`](#shutdownTimeout) | 10000 | The maximum time in ms to block the `stop()` method until all consumers stop and before publishing the container stopped event. |
| []()[`stopContainerWhenFenced`](#stopContainerWhenFenced) | `false` | Stop the listener container if a `ProducerFencedException` is thrown.<br/>See [After-rollback Processor](#after-rollback) for more information. |
| []()[`stopImmediate`](#stopImmediate) | `false` | When the container is stopped, stop processing after the current record instead of after processing all the records from the previous poll. |
| []()[`subBatchPerPartition`](#subBatchPerPartition) | See desc. | When using a batch listener, if this is `true`, the listener is called with the results of the poll split into sub batches, one per partition.<br/>Default `false` except when using transactions with `EOSMode.ALPHA` - see [Exactly Once Semantics](#exactly-once). |
| []()[`syncCommitTimeout`](#syncCommitTimeout) | `null` | The timeout to use when `syncCommits` is `true`.<br/>When not set, the container will attempt to determine the `default.api.timeout.ms` consumer property and use that; otherwise it will use 60 seconds. |
| []()[`syncCommits`](#syncCommits) | `true` | Whether to use sync or async commits for offsets; see `commitCallback`. |
| []()[`topics` `topicPattern` `topicPartitions`](#topics) | n/a | The configured topics, topic pattern or explicitly assigned topics/partitions.<br/>Mutually exclusive; at least one must be provided; enforced by `ContainerProperties` constructors. |
| []()[`transactionManager`](#transactionManager) | `null` | See [Transactions](#transactions). |
| | 1 | The number of records before committing pending offsets when the `ackMode` is `COUNT` or `COUNT_TIME`. |
| wrapping the message listener, invoked in order. |
| . |
| `] |
| | 5000 | The time in milliseconds after which pending offsets are committed when the `ackMode` is `TIME` or `COUNT_TIME`. |
| | LATEST\_ONLY \_NO\_TX | Whether or not to commit the initial position on assignment; by default, the initial offset will only be committed if the `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` is `latest` and it won’t run in a transaction even if there is a transaction manager present.<br/>See the javadocs for `ContainerProperties.AssignmentCommitOption` for more information about the available options. |
|| `null` | When not null, a `Duration` to sleep between polls when an `AuthenticationException` or `AuthorizationException` is thrown by the Kafka client.<br/>When null, such exceptions are considered fatal and the container will stop. |
| | A prefix for the `client.id` consumer property.<br/>Overrides the consumer factory `client.id` property; in a concurrent container, `-n` is added as a suffix for each consumer instance. |
| | false | Set to `true` to always check for a `DeserializationException` header when a `null``key` is received.<br/>Useful when the consumer code cannot determine that an `ErrorHandlingDeserializer` has been configured, such as when using a delegating deserializer. |
| | false | Set to `true` to always check for a `DeserializationException` header when a `null``value` is received.<br/>Useful when the consumer code cannot determine that an `ErrorHandlingDeserializer` has been configured, such as when using a delegating deserializer. |
| | `null` | When present and `syncCommits` is `false` a callback invoked after the commit completes. |
| | DEBUG | The logging level for logs pertaining to committing offsets. |
| . |
| | 30s | The time to wait for the consumer to start before logging an error; this might happen if, say, you use a task executor with insufficient threads. |
| |`SimpleAsyncTaskExecutor`| A task executor to run the consumer threads.<br/>The default executor creates threads named `<name>-C-n`; with the `KafkaMessageListenerContainer`, the name is the bean name; with the `ConcurrentMessageListenerContainer` the name is the bean name suffixed with `-n` where n is incremented for each child container. |
| . |
| . |
| for more information.|
| | `null` | Overrides the consumer `group.id` property; automatically set by the `@KafkaListener``id` or `groupId` property. |
| | 5.0 | Multiplier for `idleEventInterval` that is applied before any records are received.<br/>After a record is received, the multiplier is no longer applied.<br/>Available since version 2.8. |
| | 0 | Used to slow down deliveries by sleeping the thread between polls.<br/>The time to process a batch of records plus this value must be less than the `max.poll.interval.ms` consumer property. |
| .<br/>Also see `idleBeforeDataMultiplier`. |
|. |
| | None | Used to override any arbitrary consumer properties configured on the consumer factory. |
| | `false` | Set to true to log at INFO level all container properties. |
| | `null` | The message listener. |
| | `true` | Whether or not to maintain Micrometer timers for the consumer threads. |
| are not present on the broker. |
| | 30s | How often to check the state of the consumer threads for `NonResponsiveConsumerEvent` s.<br/>See `noPollThreshold` and `pollTimeout`. |
| | 3.0 | Multiplied by `pollTimeOut` to determine whether to publish a `NonResponsiveConsumerEvent`.<br/>See `monitorInterval`. |
| `. |
| `. |
| |`ThreadPoolTaskScheduler`| A scheduler on which to run the consumer monitor task. |
| ` method until all consumers stop and before publishing the container stopped event. |
| for more information. |
| | `false` | When the container is stopped, stop processing after the current record instead of after processing all the records from the previous poll. |
| . |
| | `null` | The timeout to use when `syncCommits` is `true`.<br/>When not set, the container will attempt to determine the `default.api.timeout.ms` consumer property and use that; otherwise it will use 60 seconds. |
| | `true` | Whether to use sync or async commits for offsets; see `commitCallback`. |
| | n/a | The configured topics, topic pattern or explicitly assigned topics/partitions.<br/>Mutually exclusive; at least one must be provided; enforced by `ContainerProperties` constructors. |
| []()[`afterRollbackProcessor`](#afterRollbackProcessor) |`DefaultAfterRollbackProcessor`| An `AfterRollbackProcessor` to invoke after a transaction is rolled back. |
|[]()[`applicationEventPublisher`](#applicationEventPublisher)| application context | The event publisher. |
| []()[`batchErrorHandler`](#batchErrorHandler) | See desc. | Deprecated - see `commonErrorHandler`. |
| []()[`batchInterceptor`](#batchInterceptor) | `null` | Set a `BatchInterceptor` to call before invoking the batch listener; does not apply to record listeners.<br/>Also see `interceptBeforeTx`. |
| []()[`beanName`](#beanName) | bean name | The bean name of the container; suffixed with `-n` for child containers. |
| []()[`commonErrorHandler`](#commonErrorHandler) | See desc. |`DefaultErrorHandler` or `null` when a `transactionManager` is provided when a `DefaultAfterRollbackProcessor` is used.<br/>See [Container Error Handlers](#error-handlers).|
| []()[`containerProperties`](#containerProperties) | `ContainerProperties` | The container properties instance. |
| []()[`errorHandler`](#errorHandler) | See desc. | Deprecated - see `commonErrorHandler`. |
| []()[`genericErrorHandler`](#genericErrorHandler) | See desc. | Deprecated - see `commonErrorHandler`. |
| []()[`groupId`](#groupId) | See desc. | The `containerProperties.groupId`, if present, otherwise the `group.id` property from the consumer factory. |
| []()[`interceptBeforeTx`](#interceptBeforeTx) | `true` | Determines whether the `recordInterceptor` is called before or after a transaction starts. |
| []()[`listenerId`](#listenerId) | See desc. | The bean name for user-configured containers or the `id` attribute of `@KafkaListener` s. |
| []()[`pauseRequested`](#pauseRequested) | (read only) | True if a consumer pause has been requested. |
| []()[`recordInterceptor`](#recordInterceptor) | `null` | Set a `RecordInterceptor` to call before invoking the record listener; does not apply to batch listeners.<br/>Also see `interceptBeforeTx`. |
| []()[`topicCheckTimeout`](#topicCheckTimeout) | 30s | When the `missingTopicsFatal` container property is `true`, how long to wait, in seconds, for the `describeTopics` operation to complete. |
| |`DefaultAfterRollbackProcessor`| An `AfterRollbackProcessor` to invoke after a transaction is rolled back. |
|| application context | The event publisher. |
| | See desc. | Deprecated - see `commonErrorHandler`. |
| | `null` | Set a `BatchInterceptor` to call before invoking the batch listener; does not apply to record listeners.<br/>Also see `interceptBeforeTx`. |
| | bean name | The bean name of the container; suffixed with `-n` for child containers. |
| .|
| | `ContainerProperties` | The container properties instance. |
| | See desc. | Deprecated - see `commonErrorHandler`. |
| | See desc. | Deprecated - see `commonErrorHandler`. |
| | See desc. | The `containerProperties.groupId`, if present, otherwise the `group.id` property from the consumer factory. |
| | `true` | Determines whether the `recordInterceptor` is called before or after a transaction starts. |
| | See desc. | The bean name for user-configured containers or the `id` attribute of `@KafkaListener` s. |
| | True if a consumer pause has been requested. |
| | `null` | Set a `RecordInterceptor` to call before invoking the record listener; does not apply to batch listeners.<br/>Also see `interceptBeforeTx`. |
| | 30s | When the `missingTopicsFatal` container property is `true`, how long to wait, in seconds, for the `describeTopics` operation to complete. |
| []()[`assignedPartitions`](#assignedPartitions) |(read only)| The partitions currently assigned to this container (explicitly or not). |
|[]()[`assignedPartitionsByClientId`](#assignedPartitionsByClientId)|(read only)| The partitions currently assigned to this container (explicitly or not). |
| []()[`clientIdSuffix`](#clientIdSuffix) | `null` |Used by the concurrent container to give each child container’s consumer a unique `client.id`.|
| []()[`containerPaused`](#containerPaused) | n/a | True if pause has been requested and the consumer has actually paused. |
| . |
|. |
| | `null` |Used by the concurrent container to give each child container’s consumer a unique `client.id`.|
| | n/a | True if pause has been requested and the consumer has actually paused. |
| []()[`alwaysClientIdSuffix`](#alwaysClientIdSuffix) | `true` | Set to false to suppress adding a suffix to the `client.id` consumer property, when the `concurrency` is only 1. |
| []()[`assignedPartitions`](#assignedPartitions) |(read only)| The aggregate of partitions currently assigned to this container’s child `KafkaMessageListenerContainer` s (explicitly or not). |
|[]()[`assignedPartitionsByClientId`](#assignedPartitionsByClientId)|(read only)|The partitions currently assigned to this container’s child `KafkaMessageListenerContainer` s (explicitly or not), keyed by the child container’s consumer’s `client.id` property.|
| []()[`concurrency`](#concurrency) | 1 | The number of child `KafkaMessageListenerContainer` s to manage. |
| []()[`containerPaused`](#containerPaused) | n/a | True if pause has been requested and all child containers' consumer has actually paused. |
| []()[`containers`](#containers) | n/a | A reference to all child `KafkaMessageListenerContainer` s. |
| | `true` | Set to false to suppress adding a suffix to the `client.id` consumer property, when the `concurrency` is only 1. |
| . |
|, keyed by the child container’s consumer’s `client.id` property.|
| | 1 | The number of child `KafkaMessageListenerContainer` s to manage. |
| | n/a | True if pause has been requested and all child containers' consumer has actually paused. |
| | n/a | A reference to all child `KafkaMessageListenerContainer` s. |
#### [](#events)4.1.6. Application Events
#### 4.1.6. Application Events
The following Spring application events are published by listener containers and their consumers:
...
...
@@ -2898,7 +2898,7 @@ if (event.getReason.equals(Reason.FENCED)) {
}
```
##### [](#idle-containers)Detecting Idle and Non-Responsive Consumers
##### Detecting Idle and Non-Responsive Consumers
While efficient, one problem with asynchronous consumers is detecting when they are idle.
You might want to take some action if no messages arrive for some period of time.
...
...
@@ -2946,7 +2946,7 @@ Receiving such an event lets you stop the containers, thus waking the consumer s
Starting with version 2.6.2, if a container has published a `ListenerContainerIdleEvent`, it will publish a `ListenerContainerNoLongerIdleEvent` when a record is subsequently received.
##### [](#event-consumption)Event Consumption
##### Event Consumption
You can capture these events by implementing `ApplicationListener` — either a general listener or one narrowed to only receive this specific event.
You can also use `@EventListener`, introduced in Spring Framework 4.2.
...
...
@@ -2983,12 +2983,12 @@ public class Listener {
| |If you wish to use the idle event to stop the lister container, you should not call `container.stop()` on the thread that calls the listener.<br/>Doing so causes delays and unnecessary log messages.<br/>Instead, you should hand off the event to a different thread that can then stop the container.<br/>Also, you should not `stop()` the container instance if it is a child container.<br/>You should stop the concurrent container instead.|
As discussed in [`@KafkaListener` Annotation](#kafka-listener-annotation), a `ConcurrentKafkaListenerContainerFactory` is used to create containers for annotated methods.
...
...
@@ -3264,7 +3264,7 @@ public KafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory() {
}
```
#### [](#thread-safety)4.1.10. Thread Safety
#### 4.1.10. Thread Safety
When using a concurrent message listener container, a single listener instance is invoked on all consumer threads.
Listeners, therefore, need to be thread-safe, and it is preferable to use stateless listeners.
...
...
@@ -3283,9 +3283,9 @@ Note that `SimpleThreadScope` does not destroy beans that have a destruction int
| |By default, the application context’s event multicaster invokes event listeners on the calling thread.<br/>If you change the multicaster to use an async executor, thread cleanup is not effective.|
Starting with version 2.3, the listener container will automatically create and update Micrometer `Timer` s for the listener, if `Micrometer` is detected on the class path, and a single `MeterRegistry` is present in the application context.
The timers can be disabled by setting the `ContainerProperty` `micrometerEnabled` to `false`.
...
...
@@ -3305,7 +3305,7 @@ You can add additional tags using the `ContainerProperties` `micrometerTags` pro
| |With the concurrent container, timers are created for each thread and the `name` tag is suffixed with `-n` where n is `0` to `concurrency-1`.|
Starting with version 2.5, the template will automatically create and update Micrometer `Timer` s for send operations, if `Micrometer` is detected on the class path, and a single `MeterRegistry` is present in the application context.
The timers can be disabled by setting the template’s `micrometerEnabled` property to `false`.
...
...
@@ -3322,7 +3322,7 @@ The timers are named `spring.kafka.template` and have the following tags:
You can add additional tags using the template’s `micrometerTags` property.
Starting with version 2.5, the framework provides [Factory Listeners](#factory-listeners) to manage a Micrometer `KafkaClientMetrics` instance whenever producers and consumers are created and closed.
A similar listener is provided for the `StreamsBuilderFactoryBean` - see [KafkaStreams Micrometer Support](#streams-micrometer).
#### [](#transactions)4.1.12. Transactions
#### 4.1.12. Transactions
This section describes how Spring for Apache Kafka supports transactions.
##### [](#overview-2)Overview
##### Overview
The 0.11.0.0 client library added support for transactions.
Spring for Apache Kafka adds support in the following ways:
...
...
@@ -3405,7 +3405,7 @@ With Spring Boot, it is only necessary to set the `spring.kafka.producer.transac
| |Starting with version 2.5.8, you can now configure the `maxAge` property on the producer factory.<br/>This is useful when using transactional producers that might lay idle for the broker’s `transactional.id.expiration.ms`.<br/>With current `kafka-clients`, this can cause a `ProducerFencedException` without a rebalance.<br/>By setting the `maxAge` to less than `transactional.id.expiration.ms`, the factory will refresh the producer if it is past it’s max age.|
This section refers to producer-only transactions (transactions not started by a listener container); see [Using Consumer-Initiated Transactions](#container-transaction-manager) for information about chaining transactions when the container starts the transaction.
...
...
@@ -3440,14 +3440,14 @@ See [[ex-jdbc-sync]](#ex-jdbc-sync) for examples of an application that synchron
| |Starting with versions 2.5.17, 2.6.12, 2.7.9 and 2.8.0, if the commit fails on the synchronized transaction (after the primary transaction has committed), the exception will be thrown to the caller.<br/>Previously, this was silently ignored (logged at debug).<br/>Applications should take remedial action, if necessary, to compensate for the committed primary transaction.|
The `ChainedKafkaTransactionManager` is now deprecated, since version 2.7; see the javadocs for its super class `ChainedTransactionManager` for more information.
Instead, use a `KafkaTransactionManager` in the container to start the Kafka transaction and annotate the listener method with `@Transactional` to start the other transaction.
See [[ex-jdbc-sync]](#ex-jdbc-sync) for an example application that chains JDBC and Kafka transactions.
##### [](#kafkatemplate-local-transactions)`KafkaTemplate` Local Transactions
##### `KafkaTemplate` Local Transactions
You can use the `KafkaTemplate` to execute a series of operations within a local transaction.
The following example shows how to do so:
...
...
@@ -3467,7 +3467,7 @@ If an exception is thrown, the transaction is rolled back.
| |If there is a `KafkaTransactionManager` (or synchronized) transaction in process, it is not used.<br/>Instead, a new "nested" transaction is used.|
As mentioned in [the overview](#transactions), the producer factory is configured with this property to build the producer `transactional.id` property.
There is a dichotomy when specifying this property in that, when running multiple instances of the application with `EOSMode.ALPHA`, it must be the same on all instances to satisfy fencing zombies (also mentioned in the overview) when producing records on a listener container thread.
...
...
@@ -3485,7 +3485,7 @@ This property must have a different value on each application instance.
This problem (different rules for `transactional.id`) has been eliminated when `EOSMode.BETA` is being used (with broker versions \>= 2.5); see [Exactly Once Semantics](#exactly-once).
##### [](#tx-template-mixed)`KafkaTemplate` Transactional and non-Transactional Publishing
##### `KafkaTemplate` Transactional and non-Transactional Publishing
Normally, when a `KafkaTemplate` is transactional (configured with a transaction-capable producer factory), transactions are required.
The transaction can be started by a `TransactionTemplate`, a `@Transactional` method, calling `executeInTransaction`, or by a listener container, when configured with a `KafkaTransactionManager`.
...
...
@@ -3494,7 +3494,7 @@ Starting with version 2.4.3, you can set the template’s `allowNonTransactional
In that case, the template will allow the operation to run without a transaction, by calling the `ProducerFactory` 's `createNonTransactionalProducer()` method; the producer will be cached, or thread-bound, as normal for reuse.
See [Using `DefaultKafkaProducerFactory`](#producer-factory).
##### [](#transactions-batch)Transactions with Batch Listeners
##### Transactions with Batch Listeners
When a listener fails while transactions are being used, the `AfterRollbackProcessor` is invoked to take some action after the rollback occurs.
When using the default `AfterRollbackProcessor` with a record listener, seeks are performed so that the failed record will be redelivered.
...
...
@@ -3552,7 +3552,7 @@ public static class Config {
}
```
#### [](#exactly-once)4.1.13. Exactly Once Semantics
#### 4.1.13. Exactly Once Semantics
You can provide a listener container with a `KafkaAwareTransactionManager` instance.
When so configured, the container starts a transaction before invoking the listener.
...
...
@@ -3602,7 +3602,7 @@ Refer to [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+
`V1` and `V2` were previously `ALPHA` and `BETA`; they have been changed to align the framework with [KIP-732](https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2).
#### [](#interceptors)4.1.14. Wiring Spring Beans into Producer/Consumer Interceptors
#### 4.1.14. Wiring Spring Beans into Producer/Consumer Interceptors
Apache Kafka provides a mechanism to add interceptors to producers and consumers.
These objects are managed by Kafka, not Spring, and so normal Spring dependency injection won’t work for wiring in dependent Spring Beans.
...
...
@@ -3737,7 +3737,7 @@ consumer interceptor in my foo bean
Received test
```
#### [](#pause-resume)4.1.15. Pausing and Resuming Listener Containers
#### 4.1.15. Pausing and Resuming Listener Containers
Version 2.1.3 added `pause()` and `resume()` methods to listener containers.
Previously, you could pause a consumer within a `ConsumerAwareMessageListener` and resume it by listening for a `ListenerContainerIdleEvent`, which provides access to the `Consumer` object.
#### [](#pause-resume-partitions)4.1.16. Pausing and Resuming Partitions on Listener Containers
#### 4.1.16. Pausing and Resuming Partitions on Listener Containers
Since version 2.7 you can pause and resume the consumption of specific partitions assigned to that consumer by using the `pausePartition(TopicPartition topicPartition)` and `resumePartition(TopicPartition topicPartition)` methods in the listener containers.
The pausing and resuming takes place respectively before and after the `poll()` similar to the `pause()` and `resume()` methods.
...
...
@@ -3821,9 +3821,9 @@ The `isPartitionPaused()` method returns true if that partition has effectively
Also since version 2.7 `ConsumerPartitionPausedEvent` and `ConsumerPartitionResumedEvent` instances are published with the container as the `source` property and the `TopicPartition` instance.
#### [](#serdes)4.1.17. Serialization, Deserialization, and Message Conversion
#### 4.1.17. Serialization, Deserialization, and Message Conversion
##### [](#overview-3)Overview
##### Overview
Apache Kafka provides a high-level API for serializing and deserializing record values as well as their keys.
It is present with the `org.apache.kafka.common.serialization.Serializer<T>` and`org.apache.kafka.common.serialization.Deserializer<T>` abstractions with some built-in implementations.
...
...
@@ -3844,7 +3844,7 @@ constructors to accept `Serializer` and `Deserializer` instances for `keys` and
When you use this API, the `DefaultKafkaProducerFactory` and `DefaultKafkaConsumerFactory` also provide properties (through constructors or setter methods) to inject custom `Serializer` and `Deserializer` instances into the target `Producer` or `Consumer`.
Also, you can pass in `Supplier<Serializer>` or `Supplier<Deserializer>` instances through constructors - these `Supplier` s are called on creation of each `Producer` or `Consumer`.
##### [](#string-serde)String serialization
##### String serialization
Since version 2.5, Spring for Apache Kafka provides `ToStringSerializer` and `ParseStringDeserializer` classes that use String representation of entities.
They rely on methods `toString` and some `Function<String>` or `BiFunction<String,Headers>` to parse the String and populate properties of an instance.
...
...
@@ -3889,7 +3889,7 @@ The method must be static and have a signature of either `(String, Headers)` or
A `ToFromStringSerde` is also provided, for use with Kafka Streams.
##### [](#json-serde)JSON
##### JSON
Spring for Apache Kafka also provides `JsonSerializer` and `JsonDeserializer` implementations that are based on the
Jackson JSON object mapper.
...
...
@@ -3916,7 +3916,7 @@ Starting with version 2.1, you can convey type information in record `Headers`,
In addition, you can configure the serializer and deserializer by using the following Kafka properties.
They have no effect if you have provided `Serializer` and `Deserializer` instances for `KafkaConsumer` and `KafkaProducer`, respectively.
* `JsonSerializer.ADD_TYPE_INFO_HEADERS` (default `true`): You can set it to `false` to disable this feature on the `JsonSerializer` (sets the `addTypeInfo` property).
...
...
@@ -3946,7 +3946,7 @@ See also [[tip-json]](#tip-json).
| |Starting with version 2.8, if you construct the serializer or deserializer programmatically as shown in [Programmatic Construction](#prog-json), the above properties will be applied by the factories, as long as you have not set any properties explicitly (using `set*()` methods or using the fluent API).<br/>Previously, when creating programmatically, the configuration properties were never applied; this is still the case if you explicitly set properties on the object directly.|
Starting with version 2.2, when using JSON, you can now provide type mappings by using the properties in the preceding list.
Previously, you had to customize the type mapper within the serializer and deserializer.
...
...
@@ -3986,7 +3986,7 @@ DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
```
###### [](#serdes-type-methods)Using Methods to Determine Types
###### Using Methods to Determine Types
Starting with version 2.5, you can now configure the deserializer, via properties, to invoke a method to determine the target type.
If present, this will override any of the other techniques discussed above.
...
...
@@ -4034,7 +4034,7 @@ public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, H
}
```
###### [](#prog-json)Programmatic Construction
###### Programmatic Construction
When constructing the serializer/deserializer programmatically for use in the producer/consumer factory, since version 2.3, you can use the fluent API, which simplifies configuration.
...
...
@@ -4080,9 +4080,9 @@ JsonDeserializer<Object> deser = new JsonDeserializer<>()
Alternatively, as long as you don’t use the fluent API to configure properties, or set them using `set*()` methods, the factories will configure the serializer/deserializer using the configuration properties; see [Configuration Properties](#serdes-json-config).
##### [](#delegating-serialization)Delegating Serializer and Deserializer
##### Delegating Serializer and Deserializer
###### [](#using-headers)Using Headers
###### Using Headers
Version 2.3 introduced the `DelegatingSerializer` and `DelegatingDeserializer`, which allow producing and consuming records with different key and/or value types.
Producers must set a header `DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR` to a selector value that is used to select which serializer to use for the value and `DelegatingSerializer.KEY_SERIALIZATION_SELECTOR` for the key; if a match is not found, an `IllegalStateException` is thrown.
...
...
@@ -4115,7 +4115,7 @@ This technique supports sending different types to the same topic (or different
For another technique to send different types to different topics, see [Using `RoutingKafkaTemplate`](#routing-template).
###### [](#by-type)By Type
###### By Type
Version 2.8 introduced the `DelegatingByTypeSerializer`.
...
...
@@ -4133,7 +4133,7 @@ public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> conf
Starting with version 2.8.3, you can configure the serializer to check if the map key is assignable from the target object, useful when a delegate serializer can serialize sub classes.
In this case, if there are amiguous matches, an ordered `Map`, such as a `LinkedHashMap` should be provided.
###### [](#by-topic)By Topic
###### By Topic
Starting with version 2.8, the `DelegatingByTopicSerializer` and `DelegatingByTopicDeserializer` allow selection of a serializer/deserializer based on the topic name.
Regex `Pattern` s are used to lookup the instance to use.
...
...
@@ -4167,7 +4167,7 @@ You can specify a default serializer/deserializer to use when there is no patter
An additional property `DelegatingByTopicSerialization.CASE_SENSITIVE` (default `true`), when set to `false` makes the topic lookup case insensitive.
The `RetryingDeserializer` uses a delegate `Deserializer` and `RetryTemplate` to retry deserialization when the delegate might have transient errors, such a network issues, during deserialization.
...
...
@@ -4179,7 +4179,7 @@ ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
Refer to the [spring-retry](https://github.com/spring-projects/spring-retry) project for configuration of the `RetryTemplate` with a retry policy, back off policy, etc.
Although the `Serializer` and `Deserializer` API is quite simple and flexible from the low-level Kafka `Consumer` and `Producer` perspective, you might need more flexibility at the Spring Messaging level, when using either `@KafkaListener` or [Spring Integration’s Apache Kafka Support](https://docs.spring.io/spring-integration/docs/current/reference/html/kafka.html#kafka).
To let you easily convert to and from `org.springframework.messaging.Message`, Spring for Apache Kafka provides a `MessageConverter` abstraction with the `MessagingMessageConverter` implementation and its `JsonMessageConverter` (and subclasses) customization.
...
...
@@ -4234,7 +4234,7 @@ public void smart(Thing thing) {
}
```
###### [](#data-projection)Using Spring Data Projection Interfaces
###### Using Spring Data Projection Interfaces
Starting with version 2.1.1, you can convert JSON to a Spring Data Projection interface instead of a concrete type.
This allows very selective, and low-coupled bindings to data, including the lookup of values from multiple places inside the JSON document.
...
...
@@ -4265,7 +4265,7 @@ You must also add `spring-data:spring-data-commons` and `com.jayway.jsonpath:jso
When used as the parameter to a `@KafkaListener` method, the interface type is automatically passed to the converter as normal.
##### [](#payload-conversion-with-batch)Payload Conversion with Batch Listeners
##### Payload Conversion with Batch Listeners
You can also use a `JsonMessageConverter` within a `BatchMessagingMessageConverter` to convert batch messages when you use a batch listener container factory.
See [Serialization, Deserialization, and Message Conversion](#serdes) and [Spring Messaging Message Conversion](#messaging-message-conversion) for more information.
...
...
@@ -4446,7 +4446,7 @@ public void listen1(List<Message<Foo>> fooMessages) {
Starting with version 2.1.1, the `org.springframework.core.convert.ConversionService` used by the default `o.s.messaging.handler.annotation.support.MessageHandlerMethodFactory` to resolve parameters for the invocation of a listener method is supplied with all beans that implement any of the following interfaces:
...
...
@@ -4461,7 +4461,7 @@ This lets you further customize listener deserialization without changing the de
| |Setting a custom `MessageHandlerMethodFactory` on the `KafkaListenerEndpointRegistrar` through a `KafkaListenerConfigurer` bean disables this feature.|
##### [](#custom-arg-resolve)Adding custom `HandlerMethodArgumentResolver` to `@KafkaListener`
##### Adding custom `HandlerMethodArgumentResolver` to `@KafkaListener`
Starting with version 2.4.2 you are able to add your own `HandlerMethodArgumentResolver` and resolve custom method parameters.
All you need is to implement `KafkaListenerConfigurer` and use method `setCustomMethodArgumentResolvers()` from class `KafkaListenerEndpointRegistrar`.
...
...
@@ -4499,7 +4499,7 @@ If you are using a `DefaultMessageHandlerMethodFactory`, set this resolver as th
See also [Null Payloads and Log Compaction of 'Tombstone' Records](#tombstones).
#### [](#headers)4.1.18. Message Headers
#### 4.1.18. Message Headers
The 0.11.0.0 client introduced support for headers in messages.
As of version 2.0, Spring for Apache Kafka now supports mapping these headers to and from `spring-messaging``MessageHeaders`.
If using Spring Boot, it will auto configure this converter bean into the auto-configured `KafkaTemplate`; otherwise you should add this converter to the template.
#### [](#tombstones)4.1.19. Null Payloads and Log Compaction of 'Tombstone' Records
#### 4.1.19. Null Payloads and Log Compaction of 'Tombstone' Records
When you use [Log Compaction](https://kafka.apache.org/documentation/#compaction), you can send and receive messages with `null` payloads to identify the deletion of a key.
...
...
@@ -4701,11 +4701,11 @@ Note that the argument is `null`, not `KafkaNull`.
| |This feature requires the use of a `KafkaNullAwarePayloadArgumentResolver` which the framework will configure when using the default `MessageHandlerMethodFactory`.<br/>When using a custom `MessageHandlerMethodFactory`, see [Adding custom `HandlerMethodArgumentResolver` to `@KafkaListener`](#custom-arg-resolve).|
Starting with version 2.8, the legacy `ErrorHandler` and `BatchErrorHandler` interfaces have been superceded by a new `CommonErrorHandler`.
These error handlers can handle errors for both record and batch listeners, allowing a single listener container factory to create containers for both types of listener.`CommonErrorHandler` implementations to replace most legacy framework error handler implementations are provided and the legacy error handlers deprecated.
...
...
@@ -4842,7 +4842,7 @@ The container commits any pending offset commits before calling the error handle
If you are using Spring Boot, you simply need to add the error handler as a `@Bean` and Boot will add it to the auto-configured factory.
##### [](#default-eh)DefaultErrorHandler
##### DefaultErrorHandler
This new error handler replaces the `SeekToCurrentErrorHandler` and `RecoveringBatchErrorHandler`, which have been the default error handlers for several releases now.
One difference is that the fallback behavior for batch listeners (when an exception other than a `BatchListenerFailedException` is thrown) is the equivalent of the [Retrying Complete Batches](#retrying-batch-eh).
...
...
@@ -4990,7 +4990,7 @@ By default, the exception type is not considered.
Also see [Delivery Attempts Header](#delivery-header).
#### [](#batch-listener-conv-errors)4.1.21. Conversion Errors with Batch Error Handlers
#### 4.1.21. Conversion Errors with Batch Error Handlers
Starting with version 2.8, batch listeners can now properly handle conversion errors, when using a `MessageConverter` with a `ByteArrayDeserializer`, a `BytesDeserializer` or a `StringDeserializer`, as well as a `DefaultErrorHandler`.
When a conversion error occurs, the payload is set to null and a deserialization exception is added to the record headers, similar to the `ErrorHandlingDeserializer`.
...
...
@@ -5011,7 +5011,7 @@ void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<Conve
This is now the fallback behavior of the `DefaultErrorHandler` for a batch listener where the listener throws an exception other than a `BatchListenerFailedException`.
...
...
@@ -5029,7 +5029,7 @@ Before exiting, regardless of the outcome, the consumer is resumed.
While waiting for a `BackOff` interval, the error handler will loop with a short sleep until the desired delay is reached, while checking to see if the container has been stopped, allowing the sleep to exit soon after the `stop()` rather than causing a delay.
The `CommonContainerStoppingErrorHandler` stops the container if the listener throws an exception.
For record listeners, when the `AckMode` is `RECORD`, offsets for already processed records are committed.
...
...
@@ -5039,21 +5039,21 @@ For record listeners, wWhen the `AckMode` is `BATCH`, or for batch listeners, th
After the container stops, an exception that wraps the `ListenerExecutionFailedException` is thrown.
This is to cause the transaction to roll back (if transactions are enabled).
##### [](#cond-eh)Delegating Error Handler
##### Delegating Error Handler
The `CommonDelegatingErrorHandler` can delegate to different error handlers, depending on the exception type.
For example, you may wish to invoke a `DefaultErrorHandler` for most exceptions, or a `CommonContainerStoppingErrorHandler` for others.
##### [](#log-eh)Logging Error Handler
##### Logging Error Handler
The `CommonLoggingErrorHandler` simply logs the exception; with a record listener, the remaining records from the previous poll are passed to the listener.
For a batch listener, all the records in the batch are logged.
##### [](#mixed-eh)Using Different Common Error Handlers for Record and Batch Listeners
##### Using Different Common Error Handlers for Record and Batch Listeners
If you wish to use a different error handling strategy for record and batch listeners, the `CommonMixedErrorHandler` is provided allowing the configuration of a specific error handler for each listener type.
##### [](#eh-summary)Common Error Handler Summery
##### Common Error Handler Summery
*`DefaultErrorHandler`
...
...
@@ -5065,7 +5065,7 @@ If you wish to use a different error handling strategy for record and batch list
*`CommonMixedErrorHandler`
##### [](#legacy-eh)Legacy Error Handlers and Their Replacements
##### Legacy Error Handlers and Their Replacements
The following applies to record listeners only, not batch listeners.
...
...
@@ -5206,7 +5206,7 @@ It is disabled by default to avoid the (small) overhead of looking up the state
The `DefaultErrorHandler` and `DefaultAfterRollbackProcessor` support this feature.
##### [](#dead-letters)Publishing Dead-letter Records
##### Publishing Dead-letter Records
You can configure the `DefaultErrorHandler` and `DefaultAfterRollbackProcessor` with a record recoverer when the maximum number of failures is reached for a record.
The framework provides the `DeadLetterPublishingRecoverer`, which publishes the failed message to another topic.
...
...
@@ -5340,7 +5340,7 @@ Starting with version 2.7, the recoverer checks that the partition selected by t
If the partition is not present, the partition in the `ProducerRecord` is set to `null`, allowing the `KafkaProducer` to select the partition.
You can disable this check by setting the `verifyPartition` property to `false`.
##### [](#dlpr-headers)Managing Dead Letter Record Headers
##### Managing Dead Letter Record Headers
Referring to [Publishing Dead-letter Records](#dead-letters) above, the `DeadLetterPublishingRecoverer` has two properties used to manage headers when those headers already exist (such as when reprocessing a dead letter record that failed, including when using [Non-Blocking Retries](#retry-topic)).
...
...
@@ -5358,7 +5358,7 @@ The reason for the two properties is because, while you might want to retain onl
Also see [Failure Header Management](#retry-headers) with [Non-Blocking Retries](#retry-topic).
Spring Framework provides a number of `BackOff` implementations.
By default, the `ExponentialBackOff` will retry indefinitely; to give up after some number of retry attempts requires calculating the `maxElapsedTime`.
This will retry after `1, 2, 4, 8, 10, 10` seconds, before calling the recoverer.
#### [](#kerberos)4.1.22. JAAS and Kerberos
#### 4.1.22. JAAS and Kerberos
Starting with version 2.0, a `KafkaJaasLoginModuleInitializer` class has been added to assist with Kerberos configuration.
You can add this bean, with the desired configuration, to your application context.
...
...
@@ -5398,13 +5398,13 @@ public KafkaJaasLoginModuleInitializer jaasConfig() throws IOException {
}
```
### [](#streams-kafka-streams)4.2. Apache Kafka Streams Support
### 4.2. Apache Kafka Streams Support
Starting with version 1.1.4, Spring for Apache Kafka provides first-class support for [Kafka Streams](https://kafka.apache.org/documentation/streams).
To use it from a Spring application, the `kafka-streams` jar must be present on classpath.
It is an optional dependency of the Spring for Apache Kafka project and is not downloaded transitively.
#### [](#basics)4.2.1. Basics
#### 4.2.1. Basics
The reference Apache Kafka Streams documentation suggests the following way of using the API:
...
...
@@ -5438,7 +5438,7 @@ So, we have two main components:
| |All `KStream` instances exposed to a `KafkaStreams` instance by a single `StreamsBuilder` are started and stopped at the same time, even if they have different logic.<br/>In other words, all streams defined by a `StreamsBuilder` are tied with a single lifecycle control.<br/>Once a `KafkaStreams` instance has been closed by `streams.close()`, it cannot be restarted.<br/>Instead, a new `KafkaStreams` instance to restart stream processing must be created.|
To simplify using Kafka Streams from the Spring application context perspective and use the lifecycle management through a container, the Spring for Apache Kafka introduces `StreamsBuilderFactoryBean`.
This is an `AbstractFactoryBean` implementation to expose a `StreamsBuilder` singleton instance as a bean.
...
...
@@ -5521,7 +5521,7 @@ Default no-op implementations are provided to avoid having to implement both met
A `CompositeKafkaStreamsInfrastructureCustomizer` is provided, for when you need to apply multiple customizers.
#### [](#streams-micrometer)4.2.3. KafkaStreams Micrometer Support
#### 4.2.3. KafkaStreams Micrometer Support
Introduced in version 2.5.3, you can configure a `KafkaStreamsMicrometerListener` to automatically register micrometer meters for the `KafkaStreams` object managed by the factory bean:
#### [](#serde)4.2.4. Streams JSON Serialization and Deserialization
#### 4.2.4. Streams JSON Serialization and Deserialization
For serializing and deserializing data when reading or writing to topics or state stores in JSON format, Spring for Apache Kafka provides a `JsonSerde` implementation that uses JSON, delegating to the `JsonSerializer` and `JsonDeserializer` described in [Serialization, Deserialization, and Message Conversion](#serdes).
The `JsonSerde` implementation provides the same configuration options through its constructor (target type or `ObjectMapper`).
#### [](#using-kafkastreambrancher)4.2.5. Using `KafkaStreamBrancher`
#### 4.2.5. Using `KafkaStreamBrancher`
The `KafkaStreamBrancher` class introduces a more convenient way to build conditional branches on top of `KStream`.
...
...
@@ -5580,7 +5580,7 @@ new KafkaStreamBrancher<String, String>()
//onTopOf method returns the provided stream so we can continue with method chaining
```
#### [](#streams-config)4.2.6. Configuration
#### 4.2.6. Configuration
To configure the Kafka Streams environment, the `StreamsBuilderFactoryBean` requires a `KafkaStreamsConfiguration` instance.
See the Apache Kafka [documentation](https://kafka.apache.org/0102/documentation/#streamsconfigs) for all possible options.
...
...
@@ -5599,7 +5599,7 @@ By default, when the factory bean is stopped, the `KafkaStreams.cleanUp()` metho
Starting with version 2.1.2, the factory bean has additional constructors, taking a `CleanupConfig` object that has properties to let you control whether the `cleanUp()` method is called during `start()` or `stop()` or neither.
Starting with version 2.7, the default is to never clean up local state.
Version 2.3 added the `HeaderEnricher` implementation of `Transformer`.
This can be used to add headers within the stream processing; the header values are SpEL expressions; the root object of the expression evaluation has 3 properties:
Version 2.3 added the `MessagingTransformer` this allows a Kafka Streams topology to interact with a Spring Messaging component, such as a Spring Integration flow.
The transformer requires an implementation of `MessagingFunction`.
...
...
@@ -5659,7 +5659,7 @@ Spring Integration automatically provides an implementation using its `GatewayPr
It also requires a `MessagingMessageConverter` to convert the key, value and metadata (including headers) to/from a Spring Messaging `Message<?>`.
See [[Calling a Spring Integration Flow from a `KStream`](https://docs.spring.io/spring-integration/docs/current/reference/html/kafka.html#streams-integration)] for more information.
#### [](#streams-deser-recovery)4.2.9. Recovery from Deserialization Exceptions
#### 4.2.9. Recovery from Deserialization Exceptions
Version 2.3 introduced the `RecoveringDeserializationExceptionHandler` which can take some action when a deserialization exception occurs.
Refer to the Kafka documentation about `DeserializationExceptionHandler`, of which the `RecoveringDeserializationExceptionHandler` is an implementation.
...
...
@@ -5690,7 +5690,7 @@ public DeadLetterPublishingRecoverer recoverer() {
Of course, the `recoverer()` bean can be your own implementation of `ConsumerRecordRecoverer`.
#### [](#kafka-streams-example)4.2.10. Kafka Streams Example
#### 4.2.10. Kafka Streams Example
The following example combines all the topics we have covered in this chapter:
...
...
@@ -5740,16 +5740,16 @@ public static class KafkaStreamsConfig {
}
```
### [](#testing)4.3. Testing Applications
### 4.3. Testing Applications
The `spring-kafka-test` jar contains some useful utilities to assist with testing your applications.
#### [](#ktu)4.3.1. KafkaTestUtils
#### 4.3.1. KafkaTestUtils
`o.s.kafka.test.utils.KafkaTestUtils` provides a number of static helper methods to consume records, retrieve various record offsets, and others.
Refer to its [Javadocs](https://docs.spring.io/spring-kafka/docs/current/api/org/springframework/kafka/test/utils/KafkaTestUtils.html) for complete details.
#### [](#junit)4.3.2. JUnit
#### 4.3.2. JUnit
`o.s.kafka.test.utils.KafkaTestUtils` also provides some static methods to set up producer and consumer properties.
The following listing shows those method signatures:
...
...
@@ -5847,7 +5847,7 @@ Convenient constants (`EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS` and `E
With the `EmbeddedKafkaBroker.brokerProperties(Map<String, String>)`, you can provide additional properties for the Kafka servers.
See [Kafka Config](https://kafka.apache.org/documentation/#brokerconfigs) for more information about possible broker properties.
The following example configuration creates topics called `cat` and `hat` with five partitions, a topic called `thing1` with 10 partitions, and a topic called `thing2` with 15 partitions:
...
...
@@ -5870,7 +5870,7 @@ public class MyTests {
By default, `addTopics` will throw an exception when problems arise (such as adding a topic that already exists).
Version 2.6 added a new version of that method that returns a `Map<String, Exception>`; the key is the topic name and the value is `null` for success, or an `Exception` for a failure.
#### [](#using-the-same-brokers-for-multiple-test-classes)4.3.4. Using the Same Broker(s) for Multiple Test Classes
#### for Multiple Test Classes
There is no built-in support for doing so, but you can use the same broker for multiple test classes with something similar to the following:
...
...
@@ -5919,7 +5919,7 @@ If you are not using Spring Boot, you can obtain the bootstrap servers using `br
| |The preceding example provides no mechanism for shutting down the broker(s) when all tests are complete.<br/>This could be a problem if, say, you run your tests in a Gradle daemon.<br/>You should not use this technique in such a situation, or you should use something to call `destroy()` on the `EmbeddedKafkaBroker` when your tests are complete.|
We generally recommend that you use the rule as a `@ClassRule` to avoid starting and stopping the broker between tests (and use a different topic for each test).
Starting with version 2.0, if you use Spring’s test application context caching, you can also declare a `EmbeddedKafkaBroker` bean, so a single broker can be used across multiple test classes.
...
...
@@ -5989,7 +5989,7 @@ Properties defined by `brokerProperties` override properties found in `brokerPro
You can use the `@EmbeddedKafka` annotation with JUnit 4 or JUnit 5.
#### [](#embedded-kafka-junit5)4.3.6. @EmbeddedKafka Annotation with JUnit5
#### 4.3.6. @EmbeddedKafka Annotation with JUnit5
Starting with version 2.3, there are two ways to use the `@EmbeddedKafka` annotation with JUnit5.
When used with the `@SpringJunitConfig` annotation, the embedded broker is added to the test application context.
...
...
@@ -6015,7 +6015,7 @@ A stand-alone (not Spring test context) broker will be created if the class anno
| |When there is a Spring test application context available, the topics and broker properties can contain property placeholders, which will be resolved as long as the property is defined somewhere.<br/>If there is no Spring context available, these placeholders won’t be resolved.|
| |This is an experimental feature and the usual rule of no breaking API changes does not apply to this feature until the experimental designation is removed.<br/>Users are encouraged to try out the feature and provide feedback via GitHub Issues or GitHub discussions.<br/>This is regarding the API only; the feature is considered to be complete, and robust.|
Achieving non-blocking retry / dlt functionality with Kafka usually requires setting up extra topics and creating and configuring the corresponding listeners.
Since 2.7 Spring for Apache Kafka offers support for that via the `@RetryableTopic` annotation and `RetryTopicConfiguration` class to simplify that bootstrapping.
#### [](#how-the-pattern-works)4.4.1. How The Pattern Works
#### 4.4.1. How The Pattern Works
If message processing fails, the message is forwarded to a retry topic with a back off timestamp.
The retry topic consumer then checks the timestamp and if it’s not due it pauses the consumption for that topic’s partition.
...
...
@@ -6281,9 +6281,9 @@ The framework also takes care of creating the topics and setting up and configur
| |At this time this functionality doesn’t support class level `@KafkaListener` annotations|
#### [](#back-off-delay-precision)4.4.2. Back Off Delay Precision
#### 4.4.2. Back Off Delay Precision
##### [](#overview-and-guarantees)Overview and Guarantees
##### Overview and Guarantees
All message processing and backing off is handled by the consumer thread, and, as such, delay precision is guaranteed on a best-effort basis.
If one message’s processing takes longer than the next message’s back off period for that consumer, the next message’s delay will be higher than expected.
...
...
@@ -6295,7 +6295,7 @@ That being said, for consumers handling a single partition the message’s proce
| |It is guaranteed that a message will never be processed before its due time.|
##### [](#tuning-the-delay-precision)Tuning the Delay Precision
##### Tuning the Delay Precision
The message’s processing delay precision relies on two `ContainerProperties`: `ContainerProperties.pollTimeout` and `ContainerProperties.idlePartitionEventInterval`.
Both properties will be automatically set in the retry topic and dlt’s `ListenerContainerFactory` to one quarter of the smallest delay value for that topic, with a minimum value of 250ms and a maximum value of 5000ms.
...
...
@@ -6305,9 +6305,9 @@ This way you can tune the precision and performance for the retry topics if you
| |You can have separate `ListenerContainerFactory` instances for the main and retry topics - this way you can have different settings to better suit your needs, such as having a higher polling timeout setting for the main topics and a lower one for the retry topics.|
##### [](#using-the-retryabletopic-annotation)Using the `@RetryableTopic` annotation
##### Using the `@RetryableTopic` annotation
To configure the retry topic and dlt for a `@KafkaListener` annotated method, you just have to add the `@RetryableTopic` annotation to it and Spring for Apache Kafka will bootstrap all the necessary topics and consumers with the default configurations.
...
...
@@ -6332,7 +6332,7 @@ public void processMessage(MyPojo message) {
| |If you don’t specify a kafkaTemplate name a bean with name `retryTopicDefaultKafkaTemplate` will be looked up.<br/>If no bean is found an exception is thrown.|
If you’re using fixed delay policies such as `FixedBackOffPolicy` or `NoBackOffPolicy` you can use a single topic to accomplish the non-blocking retries.
This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended.
...
...
@@ -6481,7 +6481,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> templa
| |The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, …|
You can set the global timeout for the retrying process.
If that time is reached, the next time the consumer throws an exception the message goes straight to the DLT, or just ends the processing if no DLT is available.
...
...
@@ -6508,7 +6508,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> templa
| |The default is having no timeout set, which can also be achieved by providing -1 as the timout value.|
##### [](#include-and-exclude-topics)Include and Exclude Topics
##### Include and Exclude Topics
You can decide which topics will and will not be handled by a `RetryTopicConfiguration` bean via the .includeTopic(String topic), .includeTopics(Collection\<String\> topics) .excludeTopic(String topic) and .excludeTopics(Collection\<String\> topics) methods.
...
...
@@ -6578,7 +6578,7 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo>
| |The default behavior is to include all topics.|
When considering how to manage failure headers (original headers and exception headers), the framework delegates to the `DeadLetterPublishingRecover` to decide whether to append or replace the headers.
##### [](#appending-the-topics-index-or-delay)Appending the Topic’s Index or Delay
##### Appending the Topic’s Index or Delay
You can either append the topic’s index or delay values after the suffix.
...
...
@@ -6707,7 +6707,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> templa
| |The default behavior is to suffix with the delay values, except for fixed delay configurations with multiple topics, in which case the topics are suffixed with the topic’s index.|
More complex naming strategies can be accomplished by registering a bean that implements `RetryTopicNamesProviderFactory`. The default implementation is `SuffixingRetryTopicNamesProviderFactory` and a different implementation can be registered in the following way:
...
...
@@ -6745,11 +6745,11 @@ public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProv
}
```
#### [](#dlt-strategies)4.4.6. Dlt Strategies
#### 4.4.6. Dlt Strategies
The framework provides a few strategies for working with DLTs. You can provide a method for DLT processing, use the default logging method, or have no DLT at all. Also you can choose what happens if DLT processing fails.
Should the DLT processing fail, there are two possible behaviors available: `ALWAYS_RETRY_ON_ERROR` and `FAIL_ON_ERROR`.
...
...
@@ -6855,7 +6855,7 @@ You can add exceptions to and remove exceptions from this list using methods on
See [Exception Classifier](#retry-topic-ex-classifier) for more information.
##### [](#configuring-no-dlt)Configuring No DLT
##### Configuring No DLT
The framework also provides the possibility of not configuring a DLT for the topic.
In this case after retrials are exhausted the processing simply ends.
...
...
@@ -6879,7 +6879,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> templ
}
```
#### [](#retry-topic-lcf)4.4.7. Specifying a ListenerContainerFactory
#### 4.4.7. Specifying a ListenerContainerFactory
By default the RetryTopic configuration will use the provided factory from the `@KafkaListener` annotation, but you can specify a different one to be used to create the retry topic and dlt listener containers.
最简单的入门方法是使用[start.spring.io](https://start.spring.io)(或 Spring Tool Suits 和 IntelliJ Idea 中的向导)并创建一个项目,选择’ Spring for Apache Kafka’作为依赖项。请参阅[Spring Boot documentation](https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-kafka)以获取有关其对基础设施 bean 的自以为是的自动配置的更多信息。
这是一个最小的消费者应用程序。
##### [](#spring-boot-consumer-app) Spring 引导消费者应用程序
在某些情况下,例如重新平衡,已经处理过的消息可能会被重新传递。框架不能知道这样的消息是否已被处理。这是一个应用程序级函数。这被称为[幂等接收机](https://www.enterpriseintegrationpatterns.com/patterns/messaging/IdempotentReceiver.html)模式,并且 Spring 集成提供了[幂等接收机](https://www.enterpriseintegrationpatterns.com/patterns/messaging/IdempotentReceiver.html)。
...
...
@@ -2380,11 +2380,11 @@ Spring for Apache Kafka 项目还通过`FilteringMessageListenerAdapter`类提
`KafkaTransactionManager`是 Spring 框架`PlatformTransactionManager`的一个实现。为生产厂在其构造中的应用提供了参考.如果你提供了一个自定义的生产者工厂,那么它必须支持事务。见`ProducerFactory.transactionCapable()`。
你可以使用具有正常 Spring 事务支持的`KafkaTransactionManager`(`@Transactional`、`TransactionTemplate`等)。如果事务是活动的,则在事务范围内执行的任何`KafkaTemplate`操作都使用事务的`Producer`。Manager 根据成功或失败提交或回滚事务。你必须配置`KafkaTemplate`以使用与事务管理器相同的`ProducerFactory`。
当你使用这个 API 时,`DefaultKafkaProducerFactory`和`DefaultKafkaConsumerFactory`还提供属性(通过构造函数或 setter 方法)来将自定义`Serializer`和`Deserializer`实例注入到目标`Producer`或`Consumer`中。同样,你可以通过构造函数传入`Supplier<Serializer>`或`Supplier<Deserializer>`实例-这些`Supplier`s 在创建每个`Producer`或`Consumer`时被调用。
##### [](#string-serde)字符串序列化
##### 字符串序列化
自版本 2.5 以来, Spring for Apache Kafka 提供了`ToStringSerializer`和`ParseStringDeserializer`使用实体的字符串表示的类。它们依赖于方法`toString`和一些`Function<String>`或`BiFunction<String,Headers>`来解析字符串并填充实例的属性。通常,这会调用类上的一些静态方法,例如`parse`:
...
...
@@ -3542,7 +3542,7 @@ ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((st
当反序列化器无法对消息进行反序列化时, Spring 无法处理该问题,因为它发生在`poll()`返回之前。为了解决这个问题,引入了`ErrorHandlingDeserializer`。这个反序列化器委托给一个真正的反序列化器(键或值)。如果委托未能反序列化记录内容,则`ErrorHandlingDeserializer`在包含原因和原始字节的头文件中返回一个`null`值和一个`DeserializationException`值。当你使用一个记录级别`MessageListener`时,如果`ConsumerRecord`包含一个用于键或值的`DeserializationException`头,则使用失败的`ErrorHandler`调用容器的`ConsumerRecord`。记录不会传递给监听器。
Spring 框架提供了许多`BackOff`实现方式。默认情况下,`ExponentialBackOff`将无限期地重试;如果要在多次重试后放弃,则需要计算`maxElapsedTime`。由于版本 2.7.3, Spring for Apache Kafka 提供了`ExponentialBackOffWithMaxRetries`,这是一个子类,它接收`maxRetries`属性并自动计算`maxElapsedTime`,这更方便一些。
从版本 1.1.4 开始, Spring for Apache Kafka 为[卡夫卡溪流](https://kafka.apache.org/documentation/streams)提供了一流的支持。要在 Spring 应用程序中使用它,`kafka-streams`jar 必须存在于 Classpath 上。它是 Spring for Apache Kafka 项目的可选依赖项,并且不是通过传递方式下载的。
从版本 1.1.4 开始, Spring for Apache Kafka 为[Kafka溪流](https://kafka.apache.org/documentation/streams)提供了一流的支持。要在 Spring 应用程序中使用它,`kafka-streams`jar 必须存在于 Classpath 上。它是 Spring for Apache Kafka 项目的可选依赖项,并且不是通过传递方式下载的。
为了简化从 Spring 应用程序上下文视角使用 Kafka 流并通过容器使用生命周期管理, Spring for Apache Kafka 引入了`StreamsBuilderFactoryBean`。这是一个`AbstractFactoryBean`实现,用于将`StreamsBuilder`单例实例公开为 Bean。下面的示例创建了这样的 Bean:
...
...
@@ -4996,7 +4996,7 @@ public interface KafkaStreamsInfrastructureCustomizer {
版本 2.3 增加了`MessagingTransformer`,这允许 Kafka Streams 拓扑与 Spring 消息传递组件进行交互,例如 Spring 集成流。转换器要求实现`MessagingFunction`。
...
...
@@ -5120,7 +5120,7 @@ public interface MessagingFunction {
Spring 集成自动提供了一种使用其`GatewayProxyFactoryBean`的实现方式。它还需要一个`MessagingMessageConverter`来将键、值和元数据(包括头)转换为/来自 Spring 消息传递`Message<?>`。参见[[从`KStream`调用 Spring 集成流](https://DOCS. Spring.io/ Spring-integration/DOCS/current/reference/html/kafka.html#Streams-integration)]以获得更多信息。
@@ -6552,7 +6552,7 @@ public class CustomJsonSerializer extends JsonSerializer<Object> {
当在 Spring 引导应用程序中对 Apache Kafka 使用 Spring 时, Apache Kafka 依赖关系版本由 Spring 引导的依赖关系管理确定。如果希望使用不同版本的`kafka-clients`或`kafka-streams`,并使用嵌入式 Kafka 代理进行测试,则需要覆盖 Spring 引导依赖项管理使用的版本,并为 Apache Kafka 添加两个`test`工件。
| |在 Microsoft Windows 上运行嵌入式代理时, Apache Kafka3.0.0 中存在一个 bug[卡夫卡-13391](https://issues.apache.org/jira/browse/KAFKA-13391)。<br/>要在 Windows 上使用嵌入式代理,需要将 Apache Kafka 版本降级到 2.8.1,直到 3.0.1 可用。<br/>使用 2.8.1 时,你还需要从`spring-kafka-test`中排除`zookeeper`依赖关系。|
| |在 Microsoft Windows 上运行嵌入式代理时, Apache Kafka3.0.0 中存在一个 bug[Kafka-13391](https://issues.apache.org/jira/browse/KAFKA-13391)。<br/>要在 Windows 上使用嵌入式代理,需要将 Apache Kafka 版本降级到 2.8.1,直到 3.0.1 可用。<br/>使用 2.8.1 时,你还需要从`spring-kafka-test`中排除`zookeeper`依赖关系。|