sendAndReceive(Message> message);
RequestReplyTypedMessageFuture sendAndReceive(Message> message,
ParameterizedTypeReference returnType);
```
These will use the template’s default `replyTimeout`, there are also overloaded versions that can take a timeout in the method call.
Use the first method if the consumer’s `Deserializer` or the template’s `MessageConverter` can convert the payload without any additional information, either via configuration or type metadata in the reply message.
Use the second method if you need to provide type information for the return type, to assist the message converter.
This also allows the same template to receive different types, even if there is no type metadata in the replies, such as when the server side is not a Spring application.
The following is an example of the latter:
Example 6. Template Bean
Java
```
@Bean
ReplyingKafkaTemplate template(
ProducerFactory pf,
ConcurrentKafkaListenerContainerFactory factory) {
ConcurrentMessageListenerContainer replyContainer =
factory.createContainer("replies");
replyContainer.getContainerProperties().setGroupId("request.replies");
ReplyingKafkaTemplate template =
new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setMessageConverter(new ByteArrayJsonMessageConverter());
template.setDefaultTopic("requests");
return template;
}
```
Kotlin
```
@Bean
fun template(
pf: ProducerFactory?,
factory: ConcurrentKafkaListenerContainerFactory
): ReplyingKafkaTemplate {
val replyContainer = factory.createContainer("replies")
replyContainer.containerProperties.groupId = "request.replies"
val template = ReplyingKafkaTemplate(pf, replyContainer)
template.messageConverter = ByteArrayJsonMessageConverter()
template.defaultTopic = "requests"
return template
}
```
Example 7. Using the template
Java
```
RequestReplyTypedMessageFuture future1 =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
new ParameterizedTypeReference() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());
RequestReplyTypedMessageFuture> future2 =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
new ParameterizedTypeReference>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
```
Kotlin
```
val future1: RequestReplyTypedMessageFuture? =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
object : ParameterizedTypeReference() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())
val future2: RequestReplyTypedMessageFuture?>? =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
object : ParameterizedTypeReference?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })
```
##### Reply Type Message\\>
When the `@KafkaListener` returns a `Message>`, with versions before 2.5, it was necessary to populate the reply topic and correlation id headers.
In this example, we use the reply topic header from the request:
```
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
```
This also shows how to set a key on the reply record.
Starting with version 2.5, the framework will detect if these headers are missing and populate them with the topic - either the topic determined from the `@SendTo` value or the incoming `KafkaHeaders.REPLY_TOPIC` header (if present).
It will also echo the incoming `KafkaHeaders.CORRELATION_ID` and `KafkaHeaders.REPLY_PARTITION`, if present.
```
@KafkaListener(id = "requestor", topics = "request")
@SendTo // default REPLY_TOPIC header
public Message> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.build();
}
```
##### Aggregating Multiple Replies
The template in [Using `ReplyingKafkaTemplate`](#replying-template) is strictly for a single request/reply scenario.
For cases where multiple receivers of a single message return a reply, you can use the `AggregatingReplyingKafkaTemplate`.
This is an implementation of the client-side of the [Scatter-Gather Enterprise Integration Pattern](https://www.enterpriseintegrationpatterns.com/patterns/messaging/BroadcastAggregate.html).
Like the `ReplyingKafkaTemplate`, the `AggregatingReplyingKafkaTemplate` constructor takes a producer factory and a listener container to receive the replies; it has a third parameter `BiPredicate>, Boolean> releaseStrategy` which is consulted each time a reply is received; when the predicate returns `true`, the collection of `ConsumerRecord` s is used to complete the `Future` returned by the `sendAndReceive` method.
There is an additional property `returnPartialOnTimeout` (default false).
When this is set to `true`, instead of completing the future with a `KafkaReplyTimeoutException`, a partial result completes the future normally (as long as at least one reply record has been received).
Starting with version 2.3.5, the predicate is also called after a timeout (if `returnPartialOnTimeout` is `true`).
The first argument is the current list of records; the second is `true` if this call is due to a timeout.
The predicate can modify the list of records.
```
AggregatingReplyingKafkaTemplate template =
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
coll -> coll.size() == releaseSize);
...
RequestReplyFuture>> future =
template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord>> consumerRecord =
future.get(30, TimeUnit.SECONDS);
```
Notice that the return type is a `ConsumerRecord` with a value that is a collection of `ConsumerRecord` s.
The "outer" `ConsumerRecord` is not a "real" record, it is synthesized by the template, as a holder for the actual reply records received for the request.
When a normal release occurs (release strategy returns true), the topic is set to `aggregatedResults`; if `returnPartialOnTimeout` is true, and timeout occurs (and at least one reply record has been received), the topic is set to `partialResultsAfterTimeout`.
The template provides constant static variables for these "topic" names:
```
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a normal release by the release strategy.
*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a timeout.
*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
```
The real `ConsumerRecord` s in the `Collection` contain the actual topic(s) from which the replies are received.
| |The listener container for the replies MUST be configured with `AckMode.MANUAL` or `AckMode.MANUAL_IMMEDIATE`; the consumer property `enable.auto.commit` must be `false` (the default since version 2.3).
To avoid any possibility of losing messages, the template only commits offsets when there are zero requests outstanding, i.e. when the last outstanding request is released by the release strategy.
After a rebalance, it is possible for duplicate reply deliveries; these will be ignored for any in-flight requests; you may see error log messages when duplicate replies are received for already released replies.|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| |If you use an [`ErrorHandlingDeserializer`](#error-handling-deserializer) with this aggregating template, the framework will not automatically detect `DeserializationException` s.
Instead, the record (with a `null` value) will be returned intact, with the deserialization exception(s) in headers.
It is recommended that applications call the utility method `ReplyingKafkaTemplate.checkDeserialization()` method to determine if a deserialization exception occurred.
See its javadocs for more information.
The `replyErrorChecker` is also not called for this aggregating template; you should perform the checks on each element of the reply.|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
#### 4.1.4. Receiving Messages
You can receive messages by configuring a `MessageListenerContainer` and providing a message listener or by using the `@KafkaListener` annotation.
##### Message Listeners
When you use a [message listener container](#message-listener-container), you must provide a listener to receive data.
There are currently eight supported interfaces for message listeners.
The following listing shows these interfaces:
```
public interface MessageListener { (1)
void onMessage(ConsumerRecord data);
}
public interface AcknowledgingMessageListener { (2)
void onMessage(ConsumerRecord data, Acknowledgment acknowledgment);
}
public interface ConsumerAwareMessageListener extends MessageListener { (3)
void onMessage(ConsumerRecord data, Consumer, ?> consumer);
}
public interface AcknowledgingConsumerAwareMessageListener extends MessageListener { (4)
void onMessage(ConsumerRecord data, Acknowledgment acknowledgment, Consumer, ?> consumer);
}
public interface BatchMessageListener { (5)
void onMessage(List> data);
}
public interface BatchAcknowledgingMessageListener { (6)
void onMessage(List> data, Acknowledgment acknowledgment);
}
public interface BatchConsumerAwareMessageListener extends BatchMessageListener { (7)
void onMessage(List> data, Consumer, ?> consumer);
}
public interface BatchAcknowledgingConsumerAwareMessageListener extends BatchMessageListener { (8)
void onMessage(List> data, Acknowledgment acknowledgment, Consumer, ?> consumer);
}
```
|**1**| Use this interface for processing individual `ConsumerRecord` instances received from the Kafka consumer `poll()` operation when using auto-commit or one of the container-managed [commit methods](#committing-offsets). |
|-----|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2**| Use this interface for processing individual `ConsumerRecord` instances received from the Kafka consumer `poll()` operation when using one of the manual [commit methods](#committing-offsets). |
|**3**| Use this interface for processing individual `ConsumerRecord` instances received from the Kafka consumer `poll()` operation when using auto-commit or one of the container-managed [commit methods](#committing-offsets).
Access to the `Consumer` object is provided. |
|**4**| Use this interface for processing individual `ConsumerRecord` instances received from the Kafka consumer `poll()` operation when using one of the manual [commit methods](#committing-offsets).
Access to the `Consumer` object is provided. |
|**5**| Use this interface for processing all `ConsumerRecord` instances received from the Kafka consumer `poll()` operation when using auto-commit or one of the container-managed [commit methods](#committing-offsets).`AckMode.RECORD` is not supported when you use this interface, since the listener is given the complete batch. |
|**6**| Use this interface for processing all `ConsumerRecord` instances received from the Kafka consumer `poll()` operation when using one of the manual [commit methods](#committing-offsets). |
|**7**|Use this interface for processing all `ConsumerRecord` instances received from the Kafka consumer `poll()` operation when using auto-commit or one of the container-managed [commit methods](#committing-offsets).`AckMode.RECORD` is not supported when you use this interface, since the listener is given the complete batch.
Access to the `Consumer` object is provided.|
|**8**| Use this interface for processing all `ConsumerRecord` instances received from the Kafka consumer `poll()` operation when using one of the manual [commit methods](#committing-offsets).
Access to the `Consumer` object is provided. |
| |The `Consumer` object is not thread-safe.
You must only invoke its methods on the thread that calls the listener.|
|---|---------------------------------------------------------------------------------------------------------------------|
| |You should not execute any `Consumer, ?>` methods that affect the consumer’s positions and or committed offsets in your listener; the container needs to manage such information.|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
##### Message Listener Containers
Two `MessageListenerContainer` implementations are provided:
* `KafkaMessageListenerContainer`
* `ConcurrentMessageListenerContainer`
The `KafkaMessageListenerContainer` receives all message from all topics or partitions on a single thread.
The `ConcurrentMessageListenerContainer` delegates to one or more `KafkaMessageListenerContainer` instances to provide multi-threaded consumption.
Starting with version 2.2.7, you can add a `RecordInterceptor` to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record.
If the interceptor returns null, the listener is not called.
Starting with version 2.7, it has additional methods which are called after the listener exits (normally, or by throwing an exception).
Also, starting with version 2.7, there is now a `BatchInterceptor`, providing similar functionality for [Batch Listeners](#batch-listeners).
In addition, the `ConsumerAwareRecordInterceptor` (and `BatchInterceptor`) provide access to the `Consumer, ?>`.
This might be used, for example, to access the consumer metrics in the interceptor.
| |You should not execute any methods that affect the consumer’s positions and or committed offsets in these interceptors; the container needs to manage such information.|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
The `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to invoke multiple interceptors.
By default, starting with version 2.8, when using transactions, the interceptor is invoked before the transaction has started.
You can set the listener container’s `interceptBeforeTx` property to `false` to invoke the interceptor after the transaction has started instead.
Starting with versions 2.3.8, 2.4.6, the `ConcurrentMessageListenerContainer` now supports [Static Membership](https://kafka.apache.org/documentation/#static_membership) when the concurrency is greater than one.
The `group.instance.id` is suffixed with `-n` with `n` starting at `1`.
This, together with an increased `session.timeout.ms`, can be used to reduce rebalance events, for example, when application instances are restarted.
###### Using `KafkaMessageListenerContainer`
The following constructor is available:
```
public KafkaMessageListenerContainer(ConsumerFactory consumerFactory,
ContainerProperties containerProperties)
```
It receives a `ConsumerFactory` and information about topics and partitions, as well as other configuration, in a `ContainerProperties`object.`ContainerProperties` has the following constructors:
```
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
```
The first constructor takes an array of `TopicPartitionOffset` arguments to explicitly instruct the container about which partitions to use (using the consumer `assign()` method) and with an optional initial offset.
A positive value is an absolute offset by default.
A negative value is relative to the current last offset within a partition by default.
A constructor for `TopicPartitionOffset` that takes an additional `boolean` argument is provided.
If this is `true`, the initial offsets (positive or negative) are relative to the current position for this consumer.
The offsets are applied when the container is started.
The second takes an array of topics, and Kafka allocates the partitions based on the `group.id` property — distributing partitions across the group.
The third uses a regex `Pattern` to select the topics.
To assign a `MessageListener` to a container, you can use the `ContainerProps.setMessageListener` method when creating the Container.
The following example shows how to do so:
```
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener() {
...
});
DefaultKafkaConsumerFactory cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
```
Note that when creating a `DefaultKafkaConsumerFactory`, using the constructor that just takes in the properties as above means that key and value `Deserializer` classes are picked up from configuration.
Alternatively, `Deserializer` instances may be passed to the `DefaultKafkaConsumerFactory` constructor for key and/or value, in which case all Consumers share the same instances.
Another option is to provide `Supplier` s (starting with version 2.3) that will be used to obtain separate `Deserializer` instances for each `Consumer`:
```
DefaultKafkaConsumerFactory cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
```
Refer to the [Javadoc](https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.html) for `ContainerProperties` for more information about the various properties that you can set.
Since version 2.1.1, a new property called `logContainerConfig` is available.
When `true` and `INFO` logging is enabled each listener container writes a log message summarizing its configuration properties.
By default, logging of topic offset commits is performed at the `DEBUG` logging level.
Starting with version 2.1.2, a property in `ContainerProperties` called `commitLogLevel` lets you specify the log level for these messages.
For example, to change the log level to `INFO`, you can use `containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);`.
Starting with version 2.2, a new container property called `missingTopicsFatal` has been added (default: `false` since 2.3.4).
This prevents the container from starting if any of the configured topics are not present on the broker.
It does not apply if the container is configured to listen to a topic pattern (regex).
Previously, the container threads looped within the `consumer.poll()` method waiting for the topic to appear while logging many messages.
Aside from the logs, there was no indication that there was a problem.
As of version 2.8, a new container property `authExceptionRetryInterval` has been introduced.
This causes the container to retry fetching messages after getting any `AuthenticationException` or `AuthorizationException` from the `KafkaConsumer`.
This can happen when, for example, the configured user is denied access to read a certain topic or credentials are incorrect.
Defining `authExceptionRetryInterval` allows the container to recover when proper permissions are granted.
| |By default, no interval is configured - authentication and authorization errors are considered fatal, which causes the container to stop.|
|---|-----------------------------------------------------------------------------------------------------------------------------------------|
Starting with version 2.8, when creating the consumer factory, if you provide deserializers as objects (in the constructor or via the setters), the factory will invoke the `configure()` method to configure them with the configuration properties.
###### Using `ConcurrentMessageListenerContainer`
The single constructor is similar to the `KafkaListenerContainer` constructor.
The following listing shows the constructor’s signature:
```
public ConcurrentMessageListenerContainer(ConsumerFactory consumerFactory,
ContainerProperties containerProperties)
```
It also has a `concurrency` property.
For example, `container.setConcurrency(3)` creates three `KafkaMessageListenerContainer` instances.
For the first constructor, Kafka distributes the partitions across the consumers using its group management capabilities.
| |When listening to multiple topics, the default partition distribution may not be what you expect.
For example, if you have three topics with five partitions each and you want to use `concurrency=15`, you see only five active consumers, each assigned one partition from each topic, with the other 10 consumers being idle.
This is because the default Kafka `PartitionAssignor` is the `RangeAssignor` (see its Javadoc).
For this scenario, you may want to consider using the `RoundRobinAssignor` instead, which distributes the partitions across all of the consumers.
Then, each consumer is assigned one topic or partition.
To change the `PartitionAssignor`, you can set the `partition.assignment.strategy` consumer property (`ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG`) in the properties provided to the `DefaultKafkaConsumerFactory`.
When using Spring Boot, you can assign set the strategy as follows:
```
spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor
```|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
When the container properties are configured with `TopicPartitionOffset` s, the `ConcurrentMessageListenerContainer` distributes the `TopicPartitionOffset` instances across the delegate `KafkaMessageListenerContainer` instances.
If, say, six `TopicPartitionOffset` instances are provided and the `concurrency` is `3`; each container gets two partitions.
For five `TopicPartitionOffset` instances, two containers get two partitions, and the third gets one.
If the `concurrency` is greater than the number of `TopicPartitions`, the `concurrency` is adjusted down such that each container gets one partition.
| |The `client.id` property (if set) is appended with `-n` where `n` is the consumer instance that corresponds to the concurrency.
This is required to provide unique names for MBeans when JMX is enabled.|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
Starting with version 1.3, the `MessageListenerContainer` provides access to the metrics of the underlying `KafkaConsumer`.
In the case of `ConcurrentMessageListenerContainer`, the `metrics()` method returns the metrics for all the target `KafkaMessageListenerContainer` instances.
The metrics are grouped into the `Map` by the `client-id` provided for the underlying `KafkaConsumer`.
Starting with version 2.3, the `ContainerProperties` provides an `idleBetweenPolls` option to let the main loop in the listener container to sleep between `KafkaConsumer.poll()` calls.
An actual sleep interval is selected as the minimum from the provided option and difference between the `max.poll.interval.ms` consumer config and the current records batch processing time.
###### Committing Offsets
Several options are provided for committing offsets.
If the `enable.auto.commit` consumer property is `true`, Kafka auto-commits the offsets according to its configuration.
If it is `false`, the containers support several `AckMode` settings (described in the next list).
The default `AckMode` is `BATCH`.
Starting with version 2.3, the framework sets `enable.auto.commit` to `false` unless explicitly set in the configuration.
Previously, the Kafka default (`true`) was used if the property was not set.
The consumer `poll()` method returns one or more `ConsumerRecords`.
The `MessageListener` is called for each record.
The following lists describes the action taken by the container for each `AckMode` (when transactions are not being used):
* `RECORD`: Commit the offset when the listener returns after processing the record.
* `BATCH`: Commit the offset when all the records returned by the `poll()` have been processed.
* `TIME`: Commit the offset when all the records returned by the `poll()` have been processed, as long as the `ackTime` since the last commit has been exceeded.
* `COUNT`: Commit the offset when all the records returned by the `poll()` have been processed, as long as `ackCount` records have been received since the last commit.
* `COUNT_TIME`: Similar to `TIME` and `COUNT`, but the commit is performed if either condition is `true`.
* `MANUAL`: The message listener is responsible to `acknowledge()` the `Acknowledgment`.
After that, the same semantics as `BATCH` are applied.
* `MANUAL_IMMEDIATE`: Commit the offset immediately when the `Acknowledgment.acknowledge()` method is called by the listener.
When using [transactions](#transactions), the offset(s) are sent to the transaction and the semantics are equivalent to `RECORD` or `BATCH`, depending on the listener type (record or batch).
| |`MANUAL`, and `MANUAL_IMMEDIATE` require the listener to be an `AcknowledgingMessageListener` or a `BatchAcknowledgingMessageListener`.
See [Message Listeners](#message-listeners).|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
Depending on the `syncCommits` container property, the `commitSync()` or `commitAsync()` method on the consumer is used.`syncCommits` is `true` by default; also see `setSyncCommitTimeout`.
See `setCommitCallback` to get the results of asynchronous commits; the default callback is the `LoggingCommitCallback` which logs errors (and successes at debug level).
Because the listener container has it’s own mechanism for committing offsets, it prefers the Kafka `ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG` to be `false`.
Starting with version 2.3, it unconditionally sets it to false unless specifically set in the consumer factory or the container’s consumer property overrides.
The `Acknowledgment` has the following method:
```
public interface Acknowledgment {
void acknowledge();
}
```
This method gives the listener control over when offsets are committed.
Starting with version 2.3, the `Acknowledgment` interface has two additional methods `nack(long sleep)` and `nack(int index, long sleep)`.
The first one is used with a record listener, the second with a batch listener.
Calling the wrong method for your listener type will throw an `IllegalStateException`.
| |If you want to commit a partial batch, using `nack()`, When using transactions, set the `AckMode` to `MANUAL`; invoking `nack()` will send the offsets of the successfully processed records to the transaction.|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| |`nack()` can only be called on the consumer thread that invokes your listener.|
|---|------------------------------------------------------------------------------|
With a record listener, when `nack()` is called, any pending offsets are committed, the remaing records from the last poll are discarded, and seeks are performed on their partitions so that the failed record and unprocessed records are redelivered on the next `poll()`.
The consumer thread can be paused before redelivery, by setting the `sleep` argument.
This is similar functionality to throwing an exception when the container is configured with a `DefaultErrorHandler`.
When using a batch listener, you can specify the index within the batch where the failure occurred.
When `nack()` is called, offsets will be committed for records before the index and seeks are performed on the partitions for the failed and discarded records so that they will be redelivered on the next `poll()`.
See [Container Error Handlers](#error-handlers) for more information.
| |When using partition assignment via group management, it is important to ensure the `sleep` argument (plus the time spent processing records from the previous poll) is less than the consumer `max.poll.interval.ms` property.|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
###### Listener Container Auto Startup
The listener containers implement `SmartLifecycle`, and `autoStartup` is `true` by default.
The containers are started in a late phase (`Integer.MAX-VALUE - 100`).
Other components that implement `SmartLifecycle`, to handle data from listeners, should be started in an earlier phase.
The `- 100` leaves room for later phases to enable components to be auto-started after the containers.
##### Manually Committing Offsets
Normally, when using `AckMode.MANUAL` or `AckMode.MANUAL_IMMEDIATE`, the acknowledgments must be acknowledged in order, because Kafka does not maintain state for each record, only a committed offset for each group/partition.
Starting with version 2.8, you can now set the container property `asyncAcks`, which allows the acknowledgments for records returned by the poll to be acknowledged in any order.
The listener container will defer the out-of-order commits until the missing acknowledgments are received.
The consumer will be paused (no new records delivered) until all the offsets for the previous poll have been committed.
| |While this feature allows applications to process records asynchronously, it should be understood that it increases the possibility of duplicate deliveries after a failure.|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
##### `@KafkaListener` Annotation
The `@KafkaListener` annotation is used to designate a bean method as a listener for a listener container.
The bean is wrapped in a `MessagingMessageListenerAdapter` configured with various features, such as converters to convert the data, if necessary, to match the method parameters.
You can configure most attributes on the annotation with SpEL by using `#{…}` or property placeholders (`${…}`).
See the [Javadoc](https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html) for more information.
###### Record Listeners
The `@KafkaListener` annotation provides a mechanism for simple POJO listeners.
The following example shows how to use it:
```
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
```
This mechanism requires an `@EnableKafka` annotation on one of your `@Configuration` classes and a listener container factory, which is used to configure the underlying `ConcurrentMessageListenerContainer`.
By default, a bean with name `kafkaListenerContainerFactory` is expected.
The following example shows how to use `ConcurrentMessageListenerContainer`:
```
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map consumerConfigs() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
```
Notice that, to set container properties, you must use the `getContainerProperties()` method on the factory.
It is used as a template for the actual properties injected into the container.
Starting with version 2.1.1, you can now set the `client.id` property for consumers created by the annotation.
The `clientIdPrefix` is suffixed with `-n`, where `n` is an integer representing the container number when using concurrency.
Starting with version 2.2, you can now override the container factory’s `concurrency` and `autoStartup` properties by using properties on the annotation itself.
The properties can be simple values, property placeholders, or SpEL expressions.
The following example shows how to do so:
```
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
```
###### Explicit Partition Assignment
You can also configure POJO listeners with explicit topics and partitions (and, optionally, their initial offsets).
The following example shows how to do so:
```
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord, ?> record) {
...
}
```
You can specify each partition in the `partitions` or `partitionOffsets` attribute but not both.
As with most annotation properties, you can use SpEL expressions; for an example of how to generate a large list of partitions, see [[tip-assign-all-parts]](#tip-assign-all-parts).
Starting with version 2.5.5, you can apply an initial offset to all assigned partitions:
```
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord, ?> record) {
...
}
```
The `*` wildcard represents all partitions in the `partitions` attribute.
There must only be one `@PartitionOffset` with the wildcard in each `@TopicPartition`.
In addition, when the listener implements `ConsumerSeekAware`, `onPartitionsAssigned` is now called, even when using manual assignment.
This allows, for example, any arbitrary seek operations at that time.
Starting with version 2.6.4, you can specify a comma-delimited list of partitions, or partition ranges:
```
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
```
The range is inclusive; the example above will assign partitions `0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15`.
The same technique can be used when specifying initial offsets:
```
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord, ?> record) {
...
}
```
The initial offset will be applied to all 6 partitions.
###### Manual Acknowledgment
When using manual `AckMode`, you can also provide the listener with the `Acknowledgment`.
The following example also shows how to use a different container factory.
```
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
```
###### Consumer Record Metadata
Finally, metadata about the record is available from message headers.
You can use the following header names to retrieve the headers of the message:
* `KafkaHeaders.OFFSET`
* `KafkaHeaders.RECEIVED_MESSAGE_KEY`
* `KafkaHeaders.RECEIVED_TOPIC`
* `KafkaHeaders.RECEIVED_PARTITION_ID`
* `KafkaHeaders.RECEIVED_TIMESTAMP`
* `KafkaHeaders.TIMESTAMP_TYPE`
Starting with version 2.5 the `RECEIVED_MESSAGE_KEY` is not present if the incoming record has a `null` key; previously the header was populated with a `null` value.
This change is to make the framework consistent with `spring-messaging` conventions where `null` valued headers are not present.
The following example shows how to use the headers:
```
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
```
Starting with version 2.5, instead of using discrete headers, you can receive record metadata in a `ConsumerRecordMetadata` parameter.
```
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
```
This contains all the data from the `ConsumerRecord` except the key and value.
###### Batch Listeners
Starting with version 1.1, you can configure `@KafkaListener` methods to receive the entire batch of consumer records received from the consumer poll.
To configure the listener container factory to create batch listeners, you can set the `batchListener` property.
The following example shows how to do so:
```
@Bean
public KafkaListenerContainerFactory, ?> batchFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}
```
| |Starting with version 2.8, you can override the factory’s `batchListener` propery using the `batch` property on the `@KafkaListener` annotation.
This, together with the changes to [Container Error Handlers](#error-handlers) allows the same factory to be used for both record and batch listeners.|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
The following example shows how to receive a list of payloads:
```
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List list) {
...
}
```
The topic, partition, offset, and so on are available in headers that parallel the payloads.
The following example shows how to use the headers:
```
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List topics,
@Header(KafkaHeaders.OFFSET) List offsets) {
...
}
```
Alternatively, you can receive a `List` of `Message>` objects with each offset and other details in each message, but it must be the only parameter (aside from optional `Acknowledgment`, when using manual commits, and/or `Consumer, ?>` parameters) defined on the method.
The following example shows how to do so:
```
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List> list, Acknowledgment ack, Consumer, ?> consumer) {
...
}
```
No conversion is performed on the payloads in this case.
If the `BatchMessagingMessageConverter` is configured with a `RecordMessageConverter`, you can also add a generic type to the `Message` parameter and the payloads are converted.
See [Payload Conversion with Batch Listeners](#payload-conversion-with-batch) for more information.
You can also receive a list of `ConsumerRecord, ?>` objects, but it must be the only parameter (aside from optional `Acknowledgment`, when using manual commits and `Consumer, ?>` parameters) defined on the method.
The following example shows how to do so:
```
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List> list, Acknowledgment ack) {
...
}
```
Starting with version 2.2, the listener can receive the complete `ConsumerRecords, ?>` object returned by the `poll()` method, letting the listener access additional methods, such as `partitions()` (which returns the `TopicPartition` instances in the list) and `records(TopicPartition)` (which gets selective records).
Again, this must be the only parameter (aside from optional `Acknowledgment`, when using manual commits or `Consumer, ?>` parameters) on the method.
The following example shows how to do so:
```
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords, ?> records) {
...
}
```
| |If the container factory has a `RecordFilterStrategy` configured, it is ignored for `ConsumerRecords, ?>` listeners, with a `WARN` log message emitted.
Records can only be filtered with a batch listener if the `>` form of listener is used.
By default, records are filtered one-at-a-time; starting with version 2.8, you can override `filterBatch` to filter the entire batch in one call.|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
###### Annotation Properties
Starting with version 2.0, the `id` property (if present) is used as the Kafka consumer `group.id` property, overriding the configured property in the consumer factory, if present.
You can also set `groupId` explicitly or set `idIsGroup` to false to restore the previous behavior of using the consumer factory `group.id`.
You can use property placeholders or SpEL expressions within most annotation properties, as the following example shows:
```
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
```
Starting with version 2.1.2, the SpEL expressions support a special token: `__listener`.
It is a pseudo bean name that represents the current bean instance within which this annotation exists.
Consider the following example:
```
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
```
Given the beans in the previous example, we can then use the following:
```
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
```
If, in the unlikely event that you have an actual bean called `__listener`, you can change the expression token byusing the `beanRef` attribute.
The following example shows how to do so:
```
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}",
groupId = "#{__x.topic}.group")
```
Starting with version 2.2.4, you can specify Kafka consumer properties directly on the annotation, these will override any properties with the same name configured in the consumer factory. You **cannot** specify the `group.id` and `client.id` properties this way; they will be ignored; use the `groupId` and `clientIdPrefix` annotation properties for those.
The properties are specified as individual strings with the normal Java `Properties` file format: `foo:bar`, `foo=bar`, or `foo bar`.
```
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
```
The following is an example of the corresponding listeners for the example in [Using `RoutingKafkaTemplate`](#routing-template).
```
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}
```
##### Obtaining the Consumer `group.id`
When running the same listener code in multiple containers, it may be useful to be able to determine which container (identified by its `group.id` consumer property) that a record came from.
You can call `KafkaUtils.getConsumerGroupId()` on the listener thread to do this.
Alternatively, you can access the group id in a method parameter.
```
@KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}", exposeGroupId = "${always:true}")
public void listener(@Payload String foo,
@Header(KafkaHeaders.GROUP_ID) String groupId) {
...
}
```
| |This is available in record listeners and batch listeners that receive a `List>` of records.
It is **not** available in a batch listener that receives a `ConsumerRecords, ?>` argument.
Use the `KafkaUtils` mechanism in that case.|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
##### Container Thread Naming
Listener containers currently use two task executors, one to invoke the consumer and another that is used to invoke the listener when the kafka consumer property `enable.auto.commit` is `false`.
You can provide custom executors by setting the `consumerExecutor` and `listenerExecutor` properties of the container’s `ContainerProperties`.
When using pooled executors, be sure that enough threads are available to handle the concurrency across all the containers in which they are used.
When using the `ConcurrentMessageListenerContainer`, a thread from each is used for each consumer (`concurrency`).
If you do not provide a consumer executor, a `SimpleAsyncTaskExecutor` is used.
This executor creates threads with names similar to `-C-1` (consumer thread).
For the `ConcurrentMessageListenerContainer`, the `` part of the thread name becomes `-m`, where `m` represents the consumer instance.`n` increments each time the container is started.
So, with a bean name of `container`, threads in this container will be named `container-0-C-1`, `container-1-C-1` etc., after the container is started the first time; `container-0-C-2`, `container-1-C-2` etc., after a stop and subsequent start.
##### `@KafkaListener` as a Meta Annotation
Starting with version 2.2, you can now use `@KafkaListener` as a meta annotation.
The following example shows how to do so:
```
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
public @interface MyThreeConsumersListener {
@AliasFor(annotation = KafkaListener.class, attribute = "id")
String id();
@AliasFor(annotation = KafkaListener.class, attribute = "topics")
String[] topics();
@AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
String concurrency() default "3";
}
```
You must alias at least one of `topics`, `topicPattern`, or `topicPartitions` (and, usually, `id` or `groupId` unless you have specified a `group.id` in the consumer factory configuration).
The following example shows how to do so:
```
@MyThreeConsumersListener(id = "my.group", topics = "my.topic")
public void listen1(String in) {
...
}
```
##### `@KafkaListener` on a Class
When you use `@KafkaListener` at the class-level, you must specify `@KafkaHandler` at the method level.
When messages are delivered, the converted message payload type is used to determine which method to call.
The following example shows how to do so:
```
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
```
Starting with version 2.1.3, you can designate a `@KafkaHandler` method as the default method that is invoked if there is no match on other methods.
At most, one method can be so designated.
When using `@KafkaHandler` methods, the payload must have already been converted to the domain object (so the match can be performed).
Use a custom deserializer, the `JsonDeserializer`, or the `JsonMessageConverter` with its `TypePrecedence` set to `TYPE_ID`.
See [Serialization, Deserialization, and Message Conversion](#serdes) for more information.
| |Due to some limitations in the way Spring resolves method arguments, a default `@KafkaHandler` cannot receive discrete headers; it must use the `ConsumerRecordMetadata` as discussed in [Consumer Record Metadata](#consumer-record-metadata).|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
For example:
```
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
```
This won’t work if the object is a `String`; the `topic` parameter will also get a reference to `object`.
If you need metadata about the record in a default method, use this:
```
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}
```
##### `@KafkaListener` Attribute Modification
Starting with version 2.7.2, you can now programmatically modify annotation attributes before the container is created.
To do so, add one or more `KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer` to the application context.`AnnotationEnhancer` is a `BiFunction