sendAndReceive(Message> message);\n\n RequestReplyTypedMessageFuture sendAndReceive(Message> message,\n ParameterizedTypeReference returnType);\n")])])]),a("p",[e._v("These will use the template’s default "),a("code",[e._v("replyTimeout")]),e._v(", there are also overloaded versions that can take a timeout in the method call.")]),e._v(" "),a("p",[e._v("Use the first method if the consumer’s "),a("code",[e._v("Deserializer")]),e._v(" or the template’s "),a("code",[e._v("MessageConverter")]),e._v(" can convert the payload without any additional information, either via configuration or type metadata in the reply message.")]),e._v(" "),a("p",[e._v("Use the second method if you need to provide type information for the return type, to assist the message converter.\nThis also allows the same template to receive different types, even if there is no type metadata in the replies, such as when the server side is not a Spring application.\nThe following is an example of the latter:")]),e._v(" "),a("p",[e._v("Example 6. Template Bean")]),e._v(" "),a("p",[e._v("Java")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Bean\nReplyingKafkaTemplate template(\n ProducerFactory pf,\n ConcurrentKafkaListenerContainerFactory factory) {\n\n ConcurrentMessageListenerContainer replyContainer =\n factory.createContainer("replies");\n replyContainer.getContainerProperties().setGroupId("request.replies");\n ReplyingKafkaTemplate template =\n new ReplyingKafkaTemplate<>(pf, replyContainer);\n template.setMessageConverter(new ByteArrayJsonMessageConverter());\n template.setDefaultTopic("requests");\n return template;\n}\n')])])]),a("p",[e._v("Kotlin")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Bean\nfun template(\n pf: ProducerFactory?,\n factory: ConcurrentKafkaListenerContainerFactory\n): ReplyingKafkaTemplate {\n val replyContainer = factory.createContainer("replies")\n replyContainer.containerProperties.groupId = "request.replies"\n val template = ReplyingKafkaTemplate(pf, replyContainer)\n template.messageConverter = ByteArrayJsonMessageConverter()\n template.defaultTopic = "requests"\n return template\n}\n')])])]),a("p",[e._v("Example 7. Using the template")]),e._v(" "),a("p",[e._v("Java")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('RequestReplyTypedMessageFuture future1 =\n template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),\n new ParameterizedTypeReference() { });\nlog.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());\nThing thing = future1.get(10, TimeUnit.SECONDS).getPayload();\nlog.info(thing.toString());\n\nRequestReplyTypedMessageFuture> future2 =\n template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),\n new ParameterizedTypeReference>() { });\nlog.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());\nList things = future2.get(10, TimeUnit.SECONDS).getPayload();\nthings.forEach(thing1 -> log.info(thing1.toString()));\n')])])]),a("p",[e._v("Kotlin")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('val future1: RequestReplyTypedMessageFuture? =\n template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),\n object : ParameterizedTypeReference() {})\nlog.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())\nval thing = future1?.get(10, TimeUnit.SECONDS)?.payload\nlog.info(thing.toString())\n\nval future2: RequestReplyTypedMessageFuture?>? =\n template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),\n object : ParameterizedTypeReference?>() {})\nlog.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())\nval things = future2?.get(10, TimeUnit.SECONDS)?.payload\nthings?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })\n')])])]),a("h5",{attrs:{id:"reply-type-message"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#reply-type-message"}},[e._v("#")]),e._v(" Reply Type Message>")]),e._v(" "),a("p",[e._v("When the "),a("code",[e._v("@KafkaListener")]),e._v(" returns a "),a("code",[e._v("Message>")]),e._v(", with versions before 2.5, it was necessary to populate the reply topic and correlation id headers.\nIn this example, we use the reply topic header from the request:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(id = "requestor", topics = "request")\n@SendTo\npublic Message> messageReturn(String in) {\n return MessageBuilder.withPayload(in.toUpperCase())\n .setHeader(KafkaHeaders.TOPIC, replyTo)\n .setHeader(KafkaHeaders.MESSAGE_KEY, 42)\n .setHeader(KafkaHeaders.CORRELATION_ID, correlation)\n .build();\n}\n')])])]),a("p",[e._v("This also shows how to set a key on the reply record.")]),e._v(" "),a("p",[e._v("Starting with version 2.5, the framework will detect if these headers are missing and populate them with the topic - either the topic determined from the "),a("code",[e._v("@SendTo")]),e._v(" value or the incoming "),a("code",[e._v("KafkaHeaders.REPLY_TOPIC")]),e._v(" header (if present).\nIt will also echo the incoming "),a("code",[e._v("KafkaHeaders.CORRELATION_ID")]),e._v(" and "),a("code",[e._v("KafkaHeaders.REPLY_PARTITION")]),e._v(", if present.")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(id = "requestor", topics = "request")\n@SendTo // default REPLY_TOPIC header\npublic Message> messageReturn(String in) {\n return MessageBuilder.withPayload(in.toUpperCase())\n .setHeader(KafkaHeaders.MESSAGE_KEY, 42)\n .build();\n}\n')])])]),a("h5",{attrs:{id:"aggregating-multiple-replies"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#aggregating-multiple-replies"}},[e._v("#")]),e._v(" Aggregating Multiple Replies")]),e._v(" "),a("p",[e._v("The template in "),a("a",{attrs:{href:"#replying-template"}},[e._v("Using "),a("code",[e._v("ReplyingKafkaTemplate")])]),e._v(" is strictly for a single request/reply scenario.\nFor cases where multiple receivers of a single message return a reply, you can use the "),a("code",[e._v("AggregatingReplyingKafkaTemplate")]),e._v(".\nThis is an implementation of the client-side of the "),a("a",{attrs:{href:"https://www.enterpriseintegrationpatterns.com/patterns/messaging/BroadcastAggregate.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("Scatter-Gather Enterprise Integration Pattern"),a("OutboundLink")],1),e._v(".")]),e._v(" "),a("p",[e._v("Like the "),a("code",[e._v("ReplyingKafkaTemplate")]),e._v(", the "),a("code",[e._v("AggregatingReplyingKafkaTemplate")]),e._v(" constructor takes a producer factory and a listener container to receive the replies; it has a third parameter "),a("code",[e._v("BiPredicate>, Boolean> releaseStrategy")]),e._v(" which is consulted each time a reply is received; when the predicate returns "),a("code",[e._v("true")]),e._v(", the collection of "),a("code",[e._v("ConsumerRecord")]),e._v(" s is used to complete the "),a("code",[e._v("Future")]),e._v(" returned by the "),a("code",[e._v("sendAndReceive")]),e._v(" method.")]),e._v(" "),a("p",[e._v("There is an additional property "),a("code",[e._v("returnPartialOnTimeout")]),e._v(" (default false).\nWhen this is set to "),a("code",[e._v("true")]),e._v(", instead of completing the future with a "),a("code",[e._v("KafkaReplyTimeoutException")]),e._v(", a partial result completes the future normally (as long as at least one reply record has been received).")]),e._v(" "),a("p",[e._v("Starting with version 2.3.5, the predicate is also called after a timeout (if "),a("code",[e._v("returnPartialOnTimeout")]),e._v(" is "),a("code",[e._v("true")]),e._v(").\nThe first argument is the current list of records; the second is "),a("code",[e._v("true")]),e._v(" if this call is due to a timeout.\nThe predicate can modify the list of records.")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("AggregatingReplyingKafkaTemplate template =\n new AggregatingReplyingKafkaTemplate<>(producerFactory, container,\n coll -> coll.size() == releaseSize);\n...\nRequestReplyFuture>> future =\n template.sendAndReceive(record);\nfuture.getSendFuture().get(10, TimeUnit.SECONDS); // send ok\nConsumerRecord>> consumerRecord =\n future.get(30, TimeUnit.SECONDS);\n")])])]),a("p",[e._v("Notice that the return type is a "),a("code",[e._v("ConsumerRecord")]),e._v(" with a value that is a collection of "),a("code",[e._v("ConsumerRecord")]),e._v(' s.\nThe "outer" '),a("code",[e._v("ConsumerRecord")]),e._v(' is not a "real" record, it is synthesized by the template, as a holder for the actual reply records received for the request.\nWhen a normal release occurs (release strategy returns true), the topic is set to '),a("code",[e._v("aggregatedResults")]),e._v("; if "),a("code",[e._v("returnPartialOnTimeout")]),e._v(" is true, and timeout occurs (and at least one reply record has been received), the topic is set to "),a("code",[e._v("partialResultsAfterTimeout")]),e._v('.\nThe template provides constant static variables for these "topic" names:')]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('/**\n * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated\n * results in its value after a normal release by the release strategy.\n */\npublic static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";\n\n/**\n * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated\n * results in its value after a timeout.\n */\npublic static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";\n')])])]),a("p",[e._v("The real "),a("code",[e._v("ConsumerRecord")]),e._v(" s in the "),a("code",[e._v("Collection")]),e._v(" contain the actual topic(s) from which the replies are received.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("The listener container for the replies MUST be configured with "),a("code",[e._v("AckMode.MANUAL")]),e._v(" or "),a("code",[e._v("AckMode.MANUAL_IMMEDIATE")]),e._v("; the consumer property "),a("code",[e._v("enable.auto.commit")]),e._v(" must be "),a("code",[e._v("false")]),e._v(" (the default since version 2.3)."),a("br"),e._v("To avoid any possibility of losing messages, the template only commits offsets when there are zero requests outstanding, i.e. when the last outstanding request is released by the release strategy."),a("br"),e._v("After a rebalance, it is possible for duplicate reply deliveries; these will be ignored for any in-flight requests; you may see error log messages when duplicate replies are received for already released replies.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("If you use an "),a("a",{attrs:{href:"#error-handling-deserializer"}},[a("code",[e._v("ErrorHandlingDeserializer")])]),e._v(" with this aggregating template, the framework will not automatically detect "),a("code",[e._v("DeserializationException")]),e._v(" s."),a("br"),e._v("Instead, the record (with a "),a("code",[e._v("null")]),e._v(" value) will be returned intact, with the deserialization exception(s) in headers."),a("br"),e._v("It is recommended that applications call the utility method "),a("code",[e._v("ReplyingKafkaTemplate.checkDeserialization()")]),e._v(" method to determine if a deserialization exception occurred."),a("br"),e._v("See its javadocs for more information."),a("br"),e._v("The "),a("code",[e._v("replyErrorChecker")]),e._v(" is also not called for this aggregating template; you should perform the checks on each element of the reply.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h4",{attrs:{id:"_4-1-4-receiving-messages"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#_4-1-4-receiving-messages"}},[e._v("#")]),e._v(" 4.1.4. Receiving Messages")]),e._v(" "),a("p",[e._v("You can receive messages by configuring a "),a("code",[e._v("MessageListenerContainer")]),e._v(" and providing a message listener or by using the "),a("code",[e._v("@KafkaListener")]),e._v(" annotation.")]),e._v(" "),a("h5",{attrs:{id:"message-listeners"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#message-listeners"}},[e._v("#")]),e._v(" Message Listeners")]),e._v(" "),a("p",[e._v("When you use a "),a("a",{attrs:{href:"#message-listener-container"}},[e._v("message listener container")]),e._v(", you must provide a listener to receive data.\nThere are currently eight supported interfaces for message listeners.\nThe following listing shows these interfaces:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public interface MessageListener { (1)\n\n void onMessage(ConsumerRecord data);\n\n}\n\npublic interface AcknowledgingMessageListener { (2)\n\n void onMessage(ConsumerRecord data, Acknowledgment acknowledgment);\n\n}\n\npublic interface ConsumerAwareMessageListener extends MessageListener { (3)\n\n void onMessage(ConsumerRecord data, Consumer, ?> consumer);\n\n}\n\npublic interface AcknowledgingConsumerAwareMessageListener extends MessageListener { (4)\n\n void onMessage(ConsumerRecord data, Acknowledgment acknowledgment, Consumer, ?> consumer);\n\n}\n\npublic interface BatchMessageListener { (5)\n\n void onMessage(List> data);\n\n}\n\npublic interface BatchAcknowledgingMessageListener { (6)\n\n void onMessage(List> data, Acknowledgment acknowledgment);\n\n}\n\npublic interface BatchConsumerAwareMessageListener extends BatchMessageListener { (7)\n\n void onMessage(List> data, Consumer, ?> consumer);\n\n}\n\npublic interface BatchAcknowledgingConsumerAwareMessageListener extends BatchMessageListener { (8)\n\n void onMessage(List> data, Acknowledgment acknowledgment, Consumer, ?> consumer);\n\n}\n")])])]),a("table",[a("thead",[a("tr",[a("th",[a("strong",[e._v("1")])]),e._v(" "),a("th",[e._v("Use this interface for processing individual "),a("code",[e._v("ConsumerRecord")]),e._v(" instances received from the Kafka consumer "),a("code",[e._v("poll()")]),e._v(" operation when using auto-commit or one of the container-managed "),a("a",{attrs:{href:"#committing-offsets"}},[e._v("commit methods")]),e._v(".")])])]),e._v(" "),a("tbody",[a("tr",[a("td",[a("strong",[e._v("2")])]),e._v(" "),a("td",[e._v("Use this interface for processing individual "),a("code",[e._v("ConsumerRecord")]),e._v(" instances received from the Kafka consumer "),a("code",[e._v("poll()")]),e._v(" operation when using one of the manual "),a("a",{attrs:{href:"#committing-offsets"}},[e._v("commit methods")]),e._v(".")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("3")])]),e._v(" "),a("td",[e._v("Use this interface for processing individual "),a("code",[e._v("ConsumerRecord")]),e._v(" instances received from the Kafka consumer "),a("code",[e._v("poll()")]),e._v(" operation when using auto-commit or one of the container-managed "),a("a",{attrs:{href:"#committing-offsets"}},[e._v("commit methods")]),e._v("."),a("br"),e._v("Access to the "),a("code",[e._v("Consumer")]),e._v(" object is provided.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("4")])]),e._v(" "),a("td",[e._v("Use this interface for processing individual "),a("code",[e._v("ConsumerRecord")]),e._v(" instances received from the Kafka consumer "),a("code",[e._v("poll()")]),e._v(" operation when using one of the manual "),a("a",{attrs:{href:"#committing-offsets"}},[e._v("commit methods")]),e._v("."),a("br"),e._v("Access to the "),a("code",[e._v("Consumer")]),e._v(" object is provided.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("5")])]),e._v(" "),a("td",[e._v("Use this interface for processing all "),a("code",[e._v("ConsumerRecord")]),e._v(" instances received from the Kafka consumer "),a("code",[e._v("poll()")]),e._v(" operation when using auto-commit or one of the container-managed "),a("a",{attrs:{href:"#committing-offsets"}},[e._v("commit methods")]),e._v("."),a("code",[e._v("AckMode.RECORD")]),e._v(" is not supported when you use this interface, since the listener is given the complete batch.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("6")])]),e._v(" "),a("td",[e._v("Use this interface for processing all "),a("code",[e._v("ConsumerRecord")]),e._v(" instances received from the Kafka consumer "),a("code",[e._v("poll()")]),e._v(" operation when using one of the manual "),a("a",{attrs:{href:"#committing-offsets"}},[e._v("commit methods")]),e._v(".")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("7")])]),e._v(" "),a("td",[e._v("Use this interface for processing all "),a("code",[e._v("ConsumerRecord")]),e._v(" instances received from the Kafka consumer "),a("code",[e._v("poll()")]),e._v(" operation when using auto-commit or one of the container-managed "),a("a",{attrs:{href:"#committing-offsets"}},[e._v("commit methods")]),e._v("."),a("code",[e._v("AckMode.RECORD")]),e._v(" is not supported when you use this interface, since the listener is given the complete batch."),a("br"),e._v("Access to the "),a("code",[e._v("Consumer")]),e._v(" object is provided.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("8")])]),e._v(" "),a("td",[e._v("Use this interface for processing all "),a("code",[e._v("ConsumerRecord")]),e._v(" instances received from the Kafka consumer "),a("code",[e._v("poll()")]),e._v(" operation when using one of the manual "),a("a",{attrs:{href:"#committing-offsets"}},[e._v("commit methods")]),e._v("."),a("br"),e._v("Access to the "),a("code",[e._v("Consumer")]),e._v(" object is provided.")])])])]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("The "),a("code",[e._v("Consumer")]),e._v(" object is not thread-safe."),a("br"),e._v("You must only invoke its methods on the thread that calls the listener.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("You should not execute any "),a("code",[e._v("Consumer, ?>")]),e._v(" methods that affect the consumer’s positions and or committed offsets in your listener; the container needs to manage such information.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h5",{attrs:{id:"message-listener-containers"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#message-listener-containers"}},[e._v("#")]),e._v(" Message Listener Containers")]),e._v(" "),a("p",[e._v("Two "),a("code",[e._v("MessageListenerContainer")]),e._v(" implementations are provided:")]),e._v(" "),a("ul",[a("li",[a("p",[a("code",[e._v("KafkaMessageListenerContainer")])])]),e._v(" "),a("li",[a("p",[a("code",[e._v("ConcurrentMessageListenerContainer")])])])]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("KafkaMessageListenerContainer")]),e._v(" receives all message from all topics or partitions on a single thread.\nThe "),a("code",[e._v("ConcurrentMessageListenerContainer")]),e._v(" delegates to one or more "),a("code",[e._v("KafkaMessageListenerContainer")]),e._v(" instances to provide multi-threaded consumption.")]),e._v(" "),a("p",[e._v("Starting with version 2.2.7, you can add a "),a("code",[e._v("RecordInterceptor")]),e._v(" to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record.\nIf the interceptor returns null, the listener is not called.\nStarting with version 2.7, it has additional methods which are called after the listener exits (normally, or by throwing an exception).\nAlso, starting with version 2.7, there is now a "),a("code",[e._v("BatchInterceptor")]),e._v(", providing similar functionality for "),a("a",{attrs:{href:"#batch-listeners"}},[e._v("Batch Listeners")]),e._v(".\nIn addition, the "),a("code",[e._v("ConsumerAwareRecordInterceptor")]),e._v(" (and "),a("code",[e._v("BatchInterceptor")]),e._v(") provide access to the "),a("code",[e._v("Consumer, ?>")]),e._v(".\nThis might be used, for example, to access the consumer metrics in the interceptor.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("You should not execute any methods that affect the consumer’s positions and or committed offsets in these interceptors; the container needs to manage such information.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("CompositeRecordInterceptor")]),e._v(" and "),a("code",[e._v("CompositeBatchInterceptor")]),e._v(" can be used to invoke multiple interceptors.")]),e._v(" "),a("p",[e._v("By default, starting with version 2.8, when using transactions, the interceptor is invoked before the transaction has started.\nYou can set the listener container’s "),a("code",[e._v("interceptBeforeTx")]),e._v(" property to "),a("code",[e._v("false")]),e._v(" to invoke the interceptor after the transaction has started instead.")]),e._v(" "),a("p",[e._v("Starting with versions 2.3.8, 2.4.6, the "),a("code",[e._v("ConcurrentMessageListenerContainer")]),e._v(" now supports "),a("a",{attrs:{href:"https://kafka.apache.org/documentation/#static_membership",target:"_blank",rel:"noopener noreferrer"}},[e._v("Static Membership"),a("OutboundLink")],1),e._v(" when the concurrency is greater than one.\nThe "),a("code",[e._v("group.instance.id")]),e._v(" is suffixed with "),a("code",[e._v("-n")]),e._v(" with "),a("code",[e._v("n")]),e._v(" starting at "),a("code",[e._v("1")]),e._v(".\nThis, together with an increased "),a("code",[e._v("session.timeout.ms")]),e._v(", can be used to reduce rebalance events, for example, when application instances are restarted.")]),e._v(" "),a("h6",{attrs:{id:"using-kafkamessagelistenercontainer"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#using-kafkamessagelistenercontainer"}},[e._v("#")]),e._v(" Using "),a("code",[e._v("KafkaMessageListenerContainer")])]),e._v(" "),a("p",[e._v("The following constructor is available:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public KafkaMessageListenerContainer(ConsumerFactory consumerFactory,\n ContainerProperties containerProperties)\n")])])]),a("p",[e._v("It receives a "),a("code",[e._v("ConsumerFactory")]),e._v(" and information about topics and partitions, as well as other configuration, in a "),a("code",[e._v("ContainerProperties")]),e._v("object."),a("code",[e._v("ContainerProperties")]),e._v(" has the following constructors:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public ContainerProperties(TopicPartitionOffset... topicPartitions)\n\npublic ContainerProperties(String... topics)\n\npublic ContainerProperties(Pattern topicPattern)\n")])])]),a("p",[e._v("The first constructor takes an array of "),a("code",[e._v("TopicPartitionOffset")]),e._v(" arguments to explicitly instruct the container about which partitions to use (using the consumer "),a("code",[e._v("assign()")]),e._v(" method) and with an optional initial offset.\nA positive value is an absolute offset by default.\nA negative value is relative to the current last offset within a partition by default.\nA constructor for "),a("code",[e._v("TopicPartitionOffset")]),e._v(" that takes an additional "),a("code",[e._v("boolean")]),e._v(" argument is provided.\nIf this is "),a("code",[e._v("true")]),e._v(", the initial offsets (positive or negative) are relative to the current position for this consumer.\nThe offsets are applied when the container is started.\nThe second takes an array of topics, and Kafka allocates the partitions based on the "),a("code",[e._v("group.id")]),e._v(" property — distributing partitions across the group.\nThe third uses a regex "),a("code",[e._v("Pattern")]),e._v(" to select the topics.")]),e._v(" "),a("p",[e._v("To assign a "),a("code",[e._v("MessageListener")]),e._v(" to a container, you can use the "),a("code",[e._v("ContainerProps.setMessageListener")]),e._v(" method when creating the Container.\nThe following example shows how to do so:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");\ncontainerProps.setMessageListener(new MessageListener() {\n ...\n});\nDefaultKafkaConsumerFactory cf =\n new DefaultKafkaConsumerFactory<>(consumerProps());\nKafkaMessageListenerContainer container =\n new KafkaMessageListenerContainer<>(cf, containerProps);\nreturn container;\n')])])]),a("p",[e._v("Note that when creating a "),a("code",[e._v("DefaultKafkaConsumerFactory")]),e._v(", using the constructor that just takes in the properties as above means that key and value "),a("code",[e._v("Deserializer")]),e._v(" classes are picked up from configuration.\nAlternatively, "),a("code",[e._v("Deserializer")]),e._v(" instances may be passed to the "),a("code",[e._v("DefaultKafkaConsumerFactory")]),e._v(" constructor for key and/or value, in which case all Consumers share the same instances.\nAnother option is to provide "),a("code",[e._v("Supplier")]),e._v(" s (starting with version 2.3) that will be used to obtain separate "),a("code",[e._v("Deserializer")]),e._v(" instances for each "),a("code",[e._v("Consumer")]),e._v(":")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("DefaultKafkaConsumerFactory cf =\n new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());\nKafkaMessageListenerContainer container =\n new KafkaMessageListenerContainer<>(cf, containerProps);\nreturn container;\n")])])]),a("p",[e._v("Refer to the "),a("a",{attrs:{href:"https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("Javadoc"),a("OutboundLink")],1),e._v(" for "),a("code",[e._v("ContainerProperties")]),e._v(" for more information about the various properties that you can set.")]),e._v(" "),a("p",[e._v("Since version 2.1.1, a new property called "),a("code",[e._v("logContainerConfig")]),e._v(" is available.\nWhen "),a("code",[e._v("true")]),e._v(" and "),a("code",[e._v("INFO")]),e._v(" logging is enabled each listener container writes a log message summarizing its configuration properties.")]),e._v(" "),a("p",[e._v("By default, logging of topic offset commits is performed at the "),a("code",[e._v("DEBUG")]),e._v(" logging level.\nStarting with version 2.1.2, a property in "),a("code",[e._v("ContainerProperties")]),e._v(" called "),a("code",[e._v("commitLogLevel")]),e._v(" lets you specify the log level for these messages.\nFor example, to change the log level to "),a("code",[e._v("INFO")]),e._v(", you can use "),a("code",[e._v("containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);")]),e._v(".")]),e._v(" "),a("p",[e._v("Starting with version 2.2, a new container property called "),a("code",[e._v("missingTopicsFatal")]),e._v(" has been added (default: "),a("code",[e._v("false")]),e._v(" since 2.3.4).\nThis prevents the container from starting if any of the configured topics are not present on the broker.\nIt does not apply if the container is configured to listen to a topic pattern (regex).\nPreviously, the container threads looped within the "),a("code",[e._v("consumer.poll()")]),e._v(" method waiting for the topic to appear while logging many messages.\nAside from the logs, there was no indication that there was a problem.")]),e._v(" "),a("p",[e._v("As of version 2.8, a new container property "),a("code",[e._v("authExceptionRetryInterval")]),e._v(" has been introduced.\nThis causes the container to retry fetching messages after getting any "),a("code",[e._v("AuthenticationException")]),e._v(" or "),a("code",[e._v("AuthorizationException")]),e._v(" from the "),a("code",[e._v("KafkaConsumer")]),e._v(".\nThis can happen when, for example, the configured user is denied access to read a certain topic or credentials are incorrect.\nDefining "),a("code",[e._v("authExceptionRetryInterval")]),e._v(" allows the container to recover when proper permissions are granted.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("By default, no interval is configured - authentication and authorization errors are considered fatal, which causes the container to stop.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("Starting with version 2.8, when creating the consumer factory, if you provide deserializers as objects (in the constructor or via the setters), the factory will invoke the "),a("code",[e._v("configure()")]),e._v(" method to configure them with the configuration properties.")]),e._v(" "),a("h6",{attrs:{id:"using-concurrentmessagelistenercontainer"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#using-concurrentmessagelistenercontainer"}},[e._v("#")]),e._v(" Using "),a("code",[e._v("ConcurrentMessageListenerContainer")])]),e._v(" "),a("p",[e._v("The single constructor is similar to the "),a("code",[e._v("KafkaListenerContainer")]),e._v(" constructor.\nThe following listing shows the constructor’s signature:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public ConcurrentMessageListenerContainer(ConsumerFactory consumerFactory,\n ContainerProperties containerProperties)\n")])])]),a("p",[e._v("It also has a "),a("code",[e._v("concurrency")]),e._v(" property.\nFor example, "),a("code",[e._v("container.setConcurrency(3)")]),e._v(" creates three "),a("code",[e._v("KafkaMessageListenerContainer")]),e._v(" instances.")]),e._v(" "),a("p",[e._v("For the first constructor, Kafka distributes the partitions across the consumers using its group management capabilities.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("When listening to multiple topics, the default partition distribution may not be what you expect."),a("br"),e._v("For example, if you have three topics with five partitions each and you want to use "),a("code",[e._v("concurrency=15")]),e._v(", you see only five active consumers, each assigned one partition from each topic, with the other 10 consumers being idle."),a("br"),e._v("This is because the default Kafka "),a("code",[e._v("PartitionAssignor")]),e._v(" is the "),a("code",[e._v("RangeAssignor")]),e._v(" (see its Javadoc)."),a("br"),e._v("For this scenario, you may want to consider using the "),a("code",[e._v("RoundRobinAssignor")]),e._v(" instead, which distributes the partitions across all of the consumers."),a("br"),e._v("Then, each consumer is assigned one topic or partition."),a("br"),e._v("To change the "),a("code",[e._v("PartitionAssignor")]),e._v(", you can set the "),a("code",[e._v("partition.assignment.strategy")]),e._v(" consumer property ("),a("code",[e._v("ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG")]),e._v(") in the properties provided to the "),a("code",[e._v("DefaultKafkaConsumerFactory")]),e._v("."),a("br"),a("br"),e._v("When using Spring Boot, you can assign set the strategy as follows:"),a("br"),a("br"),a("code",[e._v("
spring.kafka.consumer.properties.partition.assignment.strategy=\\
org.apache.kafka.clients.consumer.RoundRobinAssignor
")])])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("When the container properties are configured with "),a("code",[e._v("TopicPartitionOffset")]),e._v(" s, the "),a("code",[e._v("ConcurrentMessageListenerContainer")]),e._v(" distributes the "),a("code",[e._v("TopicPartitionOffset")]),e._v(" instances across the delegate "),a("code",[e._v("KafkaMessageListenerContainer")]),e._v(" instances.")]),e._v(" "),a("p",[e._v("If, say, six "),a("code",[e._v("TopicPartitionOffset")]),e._v(" instances are provided and the "),a("code",[e._v("concurrency")]),e._v(" is "),a("code",[e._v("3")]),e._v("; each container gets two partitions.\nFor five "),a("code",[e._v("TopicPartitionOffset")]),e._v(" instances, two containers get two partitions, and the third gets one.\nIf the "),a("code",[e._v("concurrency")]),e._v(" is greater than the number of "),a("code",[e._v("TopicPartitions")]),e._v(", the "),a("code",[e._v("concurrency")]),e._v(" is adjusted down such that each container gets one partition.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("The "),a("code",[e._v("client.id")]),e._v(" property (if set) is appended with "),a("code",[e._v("-n")]),e._v(" where "),a("code",[e._v("n")]),e._v(" is the consumer instance that corresponds to the concurrency."),a("br"),e._v("This is required to provide unique names for MBeans when JMX is enabled.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("Starting with version 1.3, the "),a("code",[e._v("MessageListenerContainer")]),e._v(" provides access to the metrics of the underlying "),a("code",[e._v("KafkaConsumer")]),e._v(".\nIn the case of "),a("code",[e._v("ConcurrentMessageListenerContainer")]),e._v(", the "),a("code",[e._v("metrics()")]),e._v(" method returns the metrics for all the target "),a("code",[e._v("KafkaMessageListenerContainer")]),e._v(" instances.\nThe metrics are grouped into the "),a("code",[e._v("Map")]),e._v(" by the "),a("code",[e._v("client-id")]),e._v(" provided for the underlying "),a("code",[e._v("KafkaConsumer")]),e._v(".")]),e._v(" "),a("p",[e._v("Starting with version 2.3, the "),a("code",[e._v("ContainerProperties")]),e._v(" provides an "),a("code",[e._v("idleBetweenPolls")]),e._v(" option to let the main loop in the listener container to sleep between "),a("code",[e._v("KafkaConsumer.poll()")]),e._v(" calls.\nAn actual sleep interval is selected as the minimum from the provided option and difference between the "),a("code",[e._v("max.poll.interval.ms")]),e._v(" consumer config and the current records batch processing time.")]),e._v(" "),a("h6",{attrs:{id:"committing-offsets"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#committing-offsets"}},[e._v("#")]),e._v(" Committing Offsets")]),e._v(" "),a("p",[e._v("Several options are provided for committing offsets.\nIf the "),a("code",[e._v("enable.auto.commit")]),e._v(" consumer property is "),a("code",[e._v("true")]),e._v(", Kafka auto-commits the offsets according to its configuration.\nIf it is "),a("code",[e._v("false")]),e._v(", the containers support several "),a("code",[e._v("AckMode")]),e._v(" settings (described in the next list).\nThe default "),a("code",[e._v("AckMode")]),e._v(" is "),a("code",[e._v("BATCH")]),e._v(".\nStarting with version 2.3, the framework sets "),a("code",[e._v("enable.auto.commit")]),e._v(" to "),a("code",[e._v("false")]),e._v(" unless explicitly set in the configuration.\nPreviously, the Kafka default ("),a("code",[e._v("true")]),e._v(") was used if the property was not set.")]),e._v(" "),a("p",[e._v("The consumer "),a("code",[e._v("poll()")]),e._v(" method returns one or more "),a("code",[e._v("ConsumerRecords")]),e._v(".\nThe "),a("code",[e._v("MessageListener")]),e._v(" is called for each record.\nThe following lists describes the action taken by the container for each "),a("code",[e._v("AckMode")]),e._v(" (when transactions are not being used):")]),e._v(" "),a("ul",[a("li",[a("p",[a("code",[e._v("RECORD")]),e._v(": Commit the offset when the listener returns after processing the record.")])]),e._v(" "),a("li",[a("p",[a("code",[e._v("BATCH")]),e._v(": Commit the offset when all the records returned by the "),a("code",[e._v("poll()")]),e._v(" have been processed.")])]),e._v(" "),a("li",[a("p",[a("code",[e._v("TIME")]),e._v(": Commit the offset when all the records returned by the "),a("code",[e._v("poll()")]),e._v(" have been processed, as long as the "),a("code",[e._v("ackTime")]),e._v(" since the last commit has been exceeded.")])]),e._v(" "),a("li",[a("p",[a("code",[e._v("COUNT")]),e._v(": Commit the offset when all the records returned by the "),a("code",[e._v("poll()")]),e._v(" have been processed, as long as "),a("code",[e._v("ackCount")]),e._v(" records have been received since the last commit.")])]),e._v(" "),a("li",[a("p",[a("code",[e._v("COUNT_TIME")]),e._v(": Similar to "),a("code",[e._v("TIME")]),e._v(" and "),a("code",[e._v("COUNT")]),e._v(", but the commit is performed if either condition is "),a("code",[e._v("true")]),e._v(".")])]),e._v(" "),a("li",[a("p",[a("code",[e._v("MANUAL")]),e._v(": The message listener is responsible to "),a("code",[e._v("acknowledge()")]),e._v(" the "),a("code",[e._v("Acknowledgment")]),e._v(".\nAfter that, the same semantics as "),a("code",[e._v("BATCH")]),e._v(" are applied.")])]),e._v(" "),a("li",[a("p",[a("code",[e._v("MANUAL_IMMEDIATE")]),e._v(": Commit the offset immediately when the "),a("code",[e._v("Acknowledgment.acknowledge()")]),e._v(" method is called by the listener.")])])]),e._v(" "),a("p",[e._v("When using "),a("a",{attrs:{href:"#transactions"}},[e._v("transactions")]),e._v(", the offset(s) are sent to the transaction and the semantics are equivalent to "),a("code",[e._v("RECORD")]),e._v(" or "),a("code",[e._v("BATCH")]),e._v(", depending on the listener type (record or batch).")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[a("code",[e._v("MANUAL")]),e._v(", and "),a("code",[e._v("MANUAL_IMMEDIATE")]),e._v(" require the listener to be an "),a("code",[e._v("AcknowledgingMessageListener")]),e._v(" or a "),a("code",[e._v("BatchAcknowledgingMessageListener")]),e._v("."),a("br"),e._v("See "),a("a",{attrs:{href:"#message-listeners"}},[e._v("Message Listeners")]),e._v(".")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("Depending on the "),a("code",[e._v("syncCommits")]),e._v(" container property, the "),a("code",[e._v("commitSync()")]),e._v(" or "),a("code",[e._v("commitAsync()")]),e._v(" method on the consumer is used."),a("code",[e._v("syncCommits")]),e._v(" is "),a("code",[e._v("true")]),e._v(" by default; also see "),a("code",[e._v("setSyncCommitTimeout")]),e._v(".\nSee "),a("code",[e._v("setCommitCallback")]),e._v(" to get the results of asynchronous commits; the default callback is the "),a("code",[e._v("LoggingCommitCallback")]),e._v(" which logs errors (and successes at debug level).")]),e._v(" "),a("p",[e._v("Because the listener container has it’s own mechanism for committing offsets, it prefers the Kafka "),a("code",[e._v("ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG")]),e._v(" to be "),a("code",[e._v("false")]),e._v(".\nStarting with version 2.3, it unconditionally sets it to false unless specifically set in the consumer factory or the container’s consumer property overrides.")]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("Acknowledgment")]),e._v(" has the following method:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public interface Acknowledgment {\n\n void acknowledge();\n\n}\n")])])]),a("p",[e._v("This method gives the listener control over when offsets are committed.")]),e._v(" "),a("p",[e._v("Starting with version 2.3, the "),a("code",[e._v("Acknowledgment")]),e._v(" interface has two additional methods "),a("code",[e._v("nack(long sleep)")]),e._v(" and "),a("code",[e._v("nack(int index, long sleep)")]),e._v(".\nThe first one is used with a record listener, the second with a batch listener.\nCalling the wrong method for your listener type will throw an "),a("code",[e._v("IllegalStateException")]),e._v(".")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("If you want to commit a partial batch, using "),a("code",[e._v("nack()")]),e._v(", When using transactions, set the "),a("code",[e._v("AckMode")]),e._v(" to "),a("code",[e._v("MANUAL")]),e._v("; invoking "),a("code",[e._v("nack()")]),e._v(" will send the offsets of the successfully processed records to the transaction.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[a("code",[e._v("nack()")]),e._v(" can only be called on the consumer thread that invokes your listener.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("With a record listener, when "),a("code",[e._v("nack()")]),e._v(" is called, any pending offsets are committed, the remaing records from the last poll are discarded, and seeks are performed on their partitions so that the failed record and unprocessed records are redelivered on the next "),a("code",[e._v("poll()")]),e._v(".\nThe consumer thread can be paused before redelivery, by setting the "),a("code",[e._v("sleep")]),e._v(" argument.\nThis is similar functionality to throwing an exception when the container is configured with a "),a("code",[e._v("DefaultErrorHandler")]),e._v(".")]),e._v(" "),a("p",[e._v("When using a batch listener, you can specify the index within the batch where the failure occurred.\nWhen "),a("code",[e._v("nack()")]),e._v(" is called, offsets will be committed for records before the index and seeks are performed on the partitions for the failed and discarded records so that they will be redelivered on the next "),a("code",[e._v("poll()")]),e._v(".")]),e._v(" "),a("p",[e._v("See "),a("a",{attrs:{href:"#error-handlers"}},[e._v("Container Error Handlers")]),e._v(" for more information.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("When using partition assignment via group management, it is important to ensure the "),a("code",[e._v("sleep")]),e._v(" argument (plus the time spent processing records from the previous poll) is less than the consumer "),a("code",[e._v("max.poll.interval.ms")]),e._v(" property.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h6",{attrs:{id:"listener-container-auto-startup"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#listener-container-auto-startup"}},[e._v("#")]),e._v(" Listener Container Auto Startup")]),e._v(" "),a("p",[e._v("The listener containers implement "),a("code",[e._v("SmartLifecycle")]),e._v(", and "),a("code",[e._v("autoStartup")]),e._v(" is "),a("code",[e._v("true")]),e._v(" by default.\nThe containers are started in a late phase ("),a("code",[e._v("Integer.MAX-VALUE - 100")]),e._v(").\nOther components that implement "),a("code",[e._v("SmartLifecycle")]),e._v(", to handle data from listeners, should be started in an earlier phase.\nThe "),a("code",[e._v("- 100")]),e._v(" leaves room for later phases to enable components to be auto-started after the containers.")]),e._v(" "),a("h5",{attrs:{id:"manually-committing-offsets"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#manually-committing-offsets"}},[e._v("#")]),e._v(" Manually Committing Offsets")]),e._v(" "),a("p",[e._v("Normally, when using "),a("code",[e._v("AckMode.MANUAL")]),e._v(" or "),a("code",[e._v("AckMode.MANUAL_IMMEDIATE")]),e._v(", the acknowledgments must be acknowledged in order, because Kafka does not maintain state for each record, only a committed offset for each group/partition.\nStarting with version 2.8, you can now set the container property "),a("code",[e._v("asyncAcks")]),e._v(", which allows the acknowledgments for records returned by the poll to be acknowledged in any order.\nThe listener container will defer the out-of-order commits until the missing acknowledgments are received.\nThe consumer will be paused (no new records delivered) until all the offsets for the previous poll have been committed.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("While this feature allows applications to process records asynchronously, it should be understood that it increases the possibility of duplicate deliveries after a failure.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h5",{attrs:{id:"kafkalistener-annotation"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#kafkalistener-annotation"}},[e._v("#")]),e._v(" "),a("code",[e._v("@KafkaListener")]),e._v(" Annotation")]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("@KafkaListener")]),e._v(" annotation is used to designate a bean method as a listener for a listener container.\nThe bean is wrapped in a "),a("code",[e._v("MessagingMessageListenerAdapter")]),e._v(" configured with various features, such as converters to convert the data, if necessary, to match the method parameters.")]),e._v(" "),a("p",[e._v("You can configure most attributes on the annotation with SpEL by using "),a("code",[e._v("#{…}")]),e._v(" or property placeholders ("),a("code",[e._v("${…}")]),e._v(").\nSee the "),a("a",{attrs:{href:"https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("Javadoc"),a("OutboundLink")],1),e._v(" for more information.")]),e._v(" "),a("h6",{attrs:{id:"record-listeners"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#record-listeners"}},[e._v("#")]),e._v(" Record Listeners")]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("@KafkaListener")]),e._v(" annotation provides a mechanism for simple POJO listeners.\nThe following example shows how to use it:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('public class Listener {\n\n @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")\n public void listen(String data) {\n ...\n }\n\n}\n')])])]),a("p",[e._v("This mechanism requires an "),a("code",[e._v("@EnableKafka")]),e._v(" annotation on one of your "),a("code",[e._v("@Configuration")]),e._v(" classes and a listener container factory, which is used to configure the underlying "),a("code",[e._v("ConcurrentMessageListenerContainer")]),e._v(".\nBy default, a bean with name "),a("code",[e._v("kafkaListenerContainerFactory")]),e._v(" is expected.\nThe following example shows how to use "),a("code",[e._v("ConcurrentMessageListenerContainer")]),e._v(":")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("@Configuration\n@EnableKafka\npublic class KafkaConfig {\n\n @Bean\n KafkaListenerContainerFactory>\n kafkaListenerContainerFactory() {\n ConcurrentKafkaListenerContainerFactory factory =\n new ConcurrentKafkaListenerContainerFactory<>();\n factory.setConsumerFactory(consumerFactory());\n factory.setConcurrency(3);\n factory.getContainerProperties().setPollTimeout(3000);\n return factory;\n }\n\n @Bean\n public ConsumerFactory consumerFactory() {\n return new DefaultKafkaConsumerFactory<>(consumerConfigs());\n }\n\n @Bean\n public Map consumerConfigs() {\n Map props = new HashMap<>();\n props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());\n ...\n return props;\n }\n}\n")])])]),a("p",[e._v("Notice that, to set container properties, you must use the "),a("code",[e._v("getContainerProperties()")]),e._v(" method on the factory.\nIt is used as a template for the actual properties injected into the container.")]),e._v(" "),a("p",[e._v("Starting with version 2.1.1, you can now set the "),a("code",[e._v("client.id")]),e._v(" property for consumers created by the annotation.\nThe "),a("code",[e._v("clientIdPrefix")]),e._v(" is suffixed with "),a("code",[e._v("-n")]),e._v(", where "),a("code",[e._v("n")]),e._v(" is an integer representing the container number when using concurrency.")]),e._v(" "),a("p",[e._v("Starting with version 2.2, you can now override the container factory’s "),a("code",[e._v("concurrency")]),e._v(" and "),a("code",[e._v("autoStartup")]),e._v(" properties by using properties on the annotation itself.\nThe properties can be simple values, property placeholders, or SpEL expressions.\nThe following example shows how to do so:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(id = "myListener", topics = "myTopic",\n autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")\npublic void listen(String data) {\n ...\n}\n')])])]),a("h6",{attrs:{id:"explicit-partition-assignment"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#explicit-partition-assignment"}},[e._v("#")]),e._v(" Explicit Partition Assignment")]),e._v(" "),a("p",[e._v("You can also configure POJO listeners with explicit topics and partitions (and, optionally, their initial offsets).\nThe following example shows how to do so:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(id = "thing2", topicPartitions =\n { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),\n @TopicPartition(topic = "topic2", partitions = "0",\n partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))\n })\npublic void listen(ConsumerRecord, ?> record) {\n ...\n}\n')])])]),a("p",[e._v("You can specify each partition in the "),a("code",[e._v("partitions")]),e._v(" or "),a("code",[e._v("partitionOffsets")]),e._v(" attribute but not both.")]),e._v(" "),a("p",[e._v("As with most annotation properties, you can use SpEL expressions; for an example of how to generate a large list of partitions, see "),a("a",{attrs:{href:"#tip-assign-all-parts"}},[e._v("[tip-assign-all-parts]")]),e._v(".")]),e._v(" "),a("p",[e._v("Starting with version 2.5.5, you can apply an initial offset to all assigned partitions:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(id = "thing3", topicPartitions =\n { @TopicPartition(topic = "topic1", partitions = { "0", "1" },\n partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))\n })\npublic void listen(ConsumerRecord, ?> record) {\n ...\n}\n')])])]),a("p",[e._v("The "),a("code",[e._v("*")]),e._v(" wildcard represents all partitions in the "),a("code",[e._v("partitions")]),e._v(" attribute.\nThere must only be one "),a("code",[e._v("@PartitionOffset")]),e._v(" with the wildcard in each "),a("code",[e._v("@TopicPartition")]),e._v(".")]),e._v(" "),a("p",[e._v("In addition, when the listener implements "),a("code",[e._v("ConsumerSeekAware")]),e._v(", "),a("code",[e._v("onPartitionsAssigned")]),e._v(" is now called, even when using manual assignment.\nThis allows, for example, any arbitrary seek operations at that time.")]),e._v(" "),a("p",[e._v("Starting with version 2.6.4, you can specify a comma-delimited list of partitions, or partition ranges:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(id = "pp", autoStartup = "false",\n topicPartitions = @TopicPartition(topic = "topic1",\n partitions = "0-5, 7, 10-15"))\npublic void process(String in) {\n ...\n}\n')])])]),a("p",[e._v("The range is inclusive; the example above will assign partitions "),a("code",[e._v("0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15")]),e._v(".")]),e._v(" "),a("p",[e._v("The same technique can be used when specifying initial offsets:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(id = "thing3", topicPartitions =\n { @TopicPartition(topic = "topic1",\n partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))\n })\npublic void listen(ConsumerRecord, ?> record) {\n ...\n}\n')])])]),a("p",[e._v("The initial offset will be applied to all 6 partitions.")]),e._v(" "),a("h6",{attrs:{id:"manual-acknowledgment"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#manual-acknowledgment"}},[e._v("#")]),e._v(" Manual Acknowledgment")]),e._v(" "),a("p",[e._v("When using manual "),a("code",[e._v("AckMode")]),e._v(", you can also provide the listener with the "),a("code",[e._v("Acknowledgment")]),e._v(".\nThe following example also shows how to use a different container factory.")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(id = "cat", topics = "myTopic",\n containerFactory = "kafkaManualAckListenerContainerFactory")\npublic void listen(String data, Acknowledgment ack) {\n ...\n ack.acknowledge();\n}\n')])])]),a("h6",{attrs:{id:"consumer-record-metadata"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#consumer-record-metadata"}},[e._v("#")]),e._v(" Consumer Record Metadata")]),e._v(" "),a("p",[e._v("Finally, metadata about the record is available from message headers.\nYou can use the following header names to retrieve the headers of the message:")]),e._v(" "),a("ul",[a("li",[a("p",[a("code",[e._v("KafkaHeaders.OFFSET")])])]),e._v(" "),a("li",[a("p",[a("code",[e._v("KafkaHeaders.RECEIVED_MESSAGE_KEY")])])]),e._v(" "),a("li",[a("p",[a("code",[e._v("KafkaHeaders.RECEIVED_TOPIC")])])]),e._v(" "),a("li",[a("p",[a("code",[e._v("KafkaHeaders.RECEIVED_PARTITION_ID")])])]),e._v(" "),a("li",[a("p",[a("code",[e._v("KafkaHeaders.RECEIVED_TIMESTAMP")])])]),e._v(" "),a("li",[a("p",[a("code",[e._v("KafkaHeaders.TIMESTAMP_TYPE")])])])]),e._v(" "),a("p",[e._v("Starting with version 2.5 the "),a("code",[e._v("RECEIVED_MESSAGE_KEY")]),e._v(" is not present if the incoming record has a "),a("code",[e._v("null")]),e._v(" key; previously the header was populated with a "),a("code",[e._v("null")]),e._v(" value.\nThis change is to make the framework consistent with "),a("code",[e._v("spring-messaging")]),e._v(" conventions where "),a("code",[e._v("null")]),e._v(" valued headers are not present.")]),e._v(" "),a("p",[e._v("The following example shows how to use the headers:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(id = "qux", topicPattern = "myTopic1")\npublic void listen(@Payload String foo,\n @Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) Integer key,\n @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,\n @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,\n @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts\n ) {\n ...\n}\n')])])]),a("p",[e._v("Starting with version 2.5, instead of using discrete headers, you can receive record metadata in a "),a("code",[e._v("ConsumerRecordMetadata")]),e._v(" parameter.")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("@KafkaListener(...)\npublic void listen(String str, ConsumerRecordMetadata meta) {\n ...\n}\n")])])]),a("p",[e._v("This contains all the data from the "),a("code",[e._v("ConsumerRecord")]),e._v(" except the key and value.")]),e._v(" "),a("h6",{attrs:{id:"batch-listeners"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#batch-listeners"}},[e._v("#")]),e._v(" Batch Listeners")]),e._v(" "),a("p",[e._v("Starting with version 1.1, you can configure "),a("code",[e._v("@KafkaListener")]),e._v(" methods to receive the entire batch of consumer records received from the consumer poll.\nTo configure the listener container factory to create batch listeners, you can set the "),a("code",[e._v("batchListener")]),e._v(" property.\nThe following example shows how to do so:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("@Bean\npublic KafkaListenerContainerFactory, ?> batchFactory() {\n ConcurrentKafkaListenerContainerFactory factory =\n new ConcurrentKafkaListenerContainerFactory<>();\n factory.setConsumerFactory(consumerFactory());\n factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<\n return factory;\n}\n")])])]),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Starting with version 2.8, you can override the factory’s "),a("code",[e._v("batchListener")]),e._v(" propery using the "),a("code",[e._v("batch")]),e._v(" property on the "),a("code",[e._v("@KafkaListener")]),e._v(" annotation."),a("br"),e._v("This, together with the changes to "),a("a",{attrs:{href:"#error-handlers"}},[e._v("Container Error Handlers")]),e._v(" allows the same factory to be used for both record and batch listeners.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("The following example shows how to receive a list of payloads:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")\npublic void listen(List list) {\n ...\n}\n')])])]),a("p",[e._v("The topic, partition, offset, and so on are available in headers that parallel the payloads.\nThe following example shows how to use the headers:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")\npublic void listen(List list,\n @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List keys,\n @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List partitions,\n @Header(KafkaHeaders.RECEIVED_TOPIC) List topics,\n @Header(KafkaHeaders.OFFSET) List offsets) {\n ...\n}\n')])])]),a("p",[e._v("Alternatively, you can receive a "),a("code",[e._v("List")]),e._v(" of "),a("code",[e._v("Message>")]),e._v(" objects with each offset and other details in each message, but it must be the only parameter (aside from optional "),a("code",[e._v("Acknowledgment")]),e._v(", when using manual commits, and/or "),a("code",[e._v("Consumer, ?>")]),e._v(" parameters) defined on the method.\nThe following example shows how to do so:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")\npublic void listen14(List> list) {\n ...\n}\n\n@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")\npublic void listen15(List> list, Acknowledgment ack) {\n ...\n}\n\n@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")\npublic void listen16(List> list, Acknowledgment ack, Consumer, ?> consumer) {\n ...\n}\n')])])]),a("p",[e._v("No conversion is performed on the payloads in this case.")]),e._v(" "),a("p",[e._v("If the "),a("code",[e._v("BatchMessagingMessageConverter")]),e._v(" is configured with a "),a("code",[e._v("RecordMessageConverter")]),e._v(", you can also add a generic type to the "),a("code",[e._v("Message")]),e._v(" parameter and the payloads are converted.\nSee "),a("a",{attrs:{href:"#payload-conversion-with-batch"}},[e._v("Payload Conversion with Batch Listeners")]),e._v(" for more information.")]),e._v(" "),a("p",[e._v("You can also receive a list of "),a("code",[e._v("ConsumerRecord, ?>")]),e._v(" objects, but it must be the only parameter (aside from optional "),a("code",[e._v("Acknowledgment")]),e._v(", when using manual commits and "),a("code",[e._v("Consumer, ?>")]),e._v(" parameters) defined on the method.\nThe following example shows how to do so:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")\npublic void listen(List> list) {\n ...\n}\n\n@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")\npublic void listen(List> list, Acknowledgment ack) {\n ...\n}\n')])])]),a("p",[e._v("Starting with version 2.2, the listener can receive the complete "),a("code",[e._v("ConsumerRecords, ?>")]),e._v(" object returned by the "),a("code",[e._v("poll()")]),e._v(" method, letting the listener access additional methods, such as "),a("code",[e._v("partitions()")]),e._v(" (which returns the "),a("code",[e._v("TopicPartition")]),e._v(" instances in the list) and "),a("code",[e._v("records(TopicPartition)")]),e._v(" (which gets selective records).\nAgain, this must be the only parameter (aside from optional "),a("code",[e._v("Acknowledgment")]),e._v(", when using manual commits or "),a("code",[e._v("Consumer, ?>")]),e._v(" parameters) on the method.\nThe following example shows how to do so:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")\npublic void pollResults(ConsumerRecords, ?> records) {\n ...\n}\n')])])]),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("If the container factory has a "),a("code",[e._v("RecordFilterStrategy")]),e._v(" configured, it is ignored for "),a("code",[e._v("ConsumerRecords, ?>")]),e._v(" listeners, with a "),a("code",[e._v("WARN")]),e._v(" log message emitted."),a("br"),e._v("Records can only be filtered with a batch listener if the "),a("code",[e._v(">")]),e._v(" form of listener is used."),a("br"),e._v("By default, records are filtered one-at-a-time; starting with version 2.8, you can override "),a("code",[e._v("filterBatch")]),e._v(" to filter the entire batch in one call.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h6",{attrs:{id:"annotation-properties"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#annotation-properties"}},[e._v("#")]),e._v(" Annotation Properties")]),e._v(" "),a("p",[e._v("Starting with version 2.0, the "),a("code",[e._v("id")]),e._v(" property (if present) is used as the Kafka consumer "),a("code",[e._v("group.id")]),e._v(" property, overriding the configured property in the consumer factory, if present.\nYou can also set "),a("code",[e._v("groupId")]),e._v(" explicitly or set "),a("code",[e._v("idIsGroup")]),e._v(" to false to restore the previous behavior of using the consumer factory "),a("code",[e._v("group.id")]),e._v(".")]),e._v(" "),a("p",[e._v("You can use property placeholders or SpEL expressions within most annotation properties, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(topics = "${some.property}")\n\n@KafkaListener(topics = "#{someBean.someProperty}",\n groupId = "#{someBean.someProperty}.group")\n')])])]),a("p",[e._v("Starting with version 2.1.2, the SpEL expressions support a special token: "),a("code",[e._v("__listener")]),e._v(".\nIt is a pseudo bean name that represents the current bean instance within which this annotation exists.")]),e._v(" "),a("p",[e._v("Consider the following example:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Bean\npublic Listener listener1() {\n return new Listener("topic1");\n}\n\n@Bean\npublic Listener listener2() {\n return new Listener("topic2");\n}\n')])])]),a("p",[e._v("Given the beans in the previous example, we can then use the following:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('public class Listener {\n\n private final String topic;\n\n public Listener(String topic) {\n this.topic = topic;\n }\n\n @KafkaListener(topics = "#{__listener.topic}",\n groupId = "#{__listener.topic}.group")\n public void listen(...) {\n ...\n }\n\n public String getTopic() {\n return this.topic;\n }\n\n}\n')])])]),a("p",[e._v("If, in the unlikely event that you have an actual bean called "),a("code",[e._v("__listener")]),e._v(", you can change the expression token byusing the "),a("code",[e._v("beanRef")]),e._v(" attribute.\nThe following example shows how to do so:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(beanRef = "__x", topics = "#{__x.topic}",\n groupId = "#{__x.topic}.group")\n')])])]),a("p",[e._v("Starting with version 2.2.4, you can specify Kafka consumer properties directly on the annotation, these will override any properties with the same name configured in the consumer factory. You "),a("strong",[e._v("cannot")]),e._v(" specify the "),a("code",[e._v("group.id")]),e._v(" and "),a("code",[e._v("client.id")]),e._v(" properties this way; they will be ignored; use the "),a("code",[e._v("groupId")]),e._v(" and "),a("code",[e._v("clientIdPrefix")]),e._v(" annotation properties for those.")]),e._v(" "),a("p",[e._v("The properties are specified as individual strings with the normal Java "),a("code",[e._v("Properties")]),e._v(" file format: "),a("code",[e._v("foo:bar")]),e._v(", "),a("code",[e._v("foo=bar")]),e._v(", or "),a("code",[e._v("foo bar")]),e._v(".")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(topics = "myTopic", groupId = "group", properties = {\n "max.poll.interval.ms:60000",\n ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"\n})\n')])])]),a("p",[e._v("The following is an example of the corresponding listeners for the example in "),a("a",{attrs:{href:"#routing-template"}},[e._v("Using "),a("code",[e._v("RoutingKafkaTemplate")])]),e._v(".")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(id = "one", topics = "one")\npublic void listen1(String in) {\n System.out.println("1: " + in);\n}\n\n@KafkaListener(id = "two", topics = "two",\n properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")\npublic void listen2(byte[] in) {\n System.out.println("2: " + new String(in));\n}\n')])])]),a("h5",{attrs:{id:"obtaining-the-consumer-group-id"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#obtaining-the-consumer-group-id"}},[e._v("#")]),e._v(" Obtaining the Consumer "),a("code",[e._v("group.id")])]),e._v(" "),a("p",[e._v("When running the same listener code in multiple containers, it may be useful to be able to determine which container (identified by its "),a("code",[e._v("group.id")]),e._v(" consumer property) that a record came from.")]),e._v(" "),a("p",[e._v("You can call "),a("code",[e._v("KafkaUtils.getConsumerGroupId()")]),e._v(" on the listener thread to do this.\nAlternatively, you can access the group id in a method parameter.")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}", exposeGroupId = "${always:true}")\npublic void listener(@Payload String foo,\n @Header(KafkaHeaders.GROUP_ID) String groupId) {\n...\n}\n')])])]),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("This is available in record listeners and batch listeners that receive a "),a("code",[e._v("List>")]),e._v(" of records."),a("br"),e._v("It is "),a("strong",[e._v("not")]),e._v(" available in a batch listener that receives a "),a("code",[e._v("ConsumerRecords, ?>")]),e._v(" argument."),a("br"),e._v("Use the "),a("code",[e._v("KafkaUtils")]),e._v(" mechanism in that case.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h5",{attrs:{id:"container-thread-naming"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#container-thread-naming"}},[e._v("#")]),e._v(" Container Thread Naming")]),e._v(" "),a("p",[e._v("Listener containers currently use two task executors, one to invoke the consumer and another that is used to invoke the listener when the kafka consumer property "),a("code",[e._v("enable.auto.commit")]),e._v(" is "),a("code",[e._v("false")]),e._v(".\nYou can provide custom executors by setting the "),a("code",[e._v("consumerExecutor")]),e._v(" and "),a("code",[e._v("listenerExecutor")]),e._v(" properties of the container’s "),a("code",[e._v("ContainerProperties")]),e._v(".\nWhen using pooled executors, be sure that enough threads are available to handle the concurrency across all the containers in which they are used.\nWhen using the "),a("code",[e._v("ConcurrentMessageListenerContainer")]),e._v(", a thread from each is used for each consumer ("),a("code",[e._v("concurrency")]),e._v(").")]),e._v(" "),a("p",[e._v("If you do not provide a consumer executor, a "),a("code",[e._v("SimpleAsyncTaskExecutor")]),e._v(" is used.\nThis executor creates threads with names similar to "),a("code",[e._v("-C-1")]),e._v(" (consumer thread).\nFor the "),a("code",[e._v("ConcurrentMessageListenerContainer")]),e._v(", the "),a("code",[e._v("")]),e._v(" part of the thread name becomes "),a("code",[e._v("-m")]),e._v(", where "),a("code",[e._v("m")]),e._v(" represents the consumer instance."),a("code",[e._v("n")]),e._v(" increments each time the container is started.\nSo, with a bean name of "),a("code",[e._v("container")]),e._v(", threads in this container will be named "),a("code",[e._v("container-0-C-1")]),e._v(", "),a("code",[e._v("container-1-C-1")]),e._v(" etc., after the container is started the first time; "),a("code",[e._v("container-0-C-2")]),e._v(", "),a("code",[e._v("container-1-C-2")]),e._v(" etc., after a stop and subsequent start.")]),e._v(" "),a("h5",{attrs:{id:"kafkalistener-as-a-meta-annotation"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#kafkalistener-as-a-meta-annotation"}},[e._v("#")]),e._v(" "),a("code",[e._v("@KafkaListener")]),e._v(" as a Meta Annotation")]),e._v(" "),a("p",[e._v("Starting with version 2.2, you can now use "),a("code",[e._v("@KafkaListener")]),e._v(" as a meta annotation.\nThe following example shows how to do so:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Target(ElementType.METHOD)\n@Retention(RetentionPolicy.RUNTIME)\n@KafkaListener\npublic @interface MyThreeConsumersListener {\n\n @AliasFor(annotation = KafkaListener.class, attribute = "id")\n String id();\n\n @AliasFor(annotation = KafkaListener.class, attribute = "topics")\n String[] topics();\n\n @AliasFor(annotation = KafkaListener.class, attribute = "concurrency")\n String concurrency() default "3";\n\n}\n')])])]),a("p",[e._v("You must alias at least one of "),a("code",[e._v("topics")]),e._v(", "),a("code",[e._v("topicPattern")]),e._v(", or "),a("code",[e._v("topicPartitions")]),e._v(" (and, usually, "),a("code",[e._v("id")]),e._v(" or "),a("code",[e._v("groupId")]),e._v(" unless you have specified a "),a("code",[e._v("group.id")]),e._v(" in the consumer factory configuration).\nThe following example shows how to do so:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@MyThreeConsumersListener(id = "my.group", topics = "my.topic")\npublic void listen1(String in) {\n ...\n}\n')])])]),a("h5",{attrs:{id:"kafkalistener-on-a-class"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#kafkalistener-on-a-class"}},[e._v("#")]),e._v(" "),a("code",[e._v("@KafkaListener")]),e._v(" on a Class")]),e._v(" "),a("p",[e._v("When you use "),a("code",[e._v("@KafkaListener")]),e._v(" at the class-level, you must specify "),a("code",[e._v("@KafkaHandler")]),e._v(" at the method level.\nWhen messages are delivered, the converted message payload type is used to determine which method to call.\nThe following example shows how to do so:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@KafkaListener(id = "multi", topics = "myTopic")\nstatic class MultiListenerBean {\n\n @KafkaHandler\n public void listen(String foo) {\n ...\n }\n\n @KafkaHandler\n public void listen(Integer bar) {\n ...\n }\n\n @KafkaHandler(isDefault = true)\n public void listenDefault(Object object) {\n ...\n }\n\n}\n')])])]),a("p",[e._v("Starting with version 2.1.3, you can designate a "),a("code",[e._v("@KafkaHandler")]),e._v(" method as the default method that is invoked if there is no match on other methods.\nAt most, one method can be so designated.\nWhen using "),a("code",[e._v("@KafkaHandler")]),e._v(" methods, the payload must have already been converted to the domain object (so the match can be performed).\nUse a custom deserializer, the "),a("code",[e._v("JsonDeserializer")]),e._v(", or the "),a("code",[e._v("JsonMessageConverter")]),e._v(" with its "),a("code",[e._v("TypePrecedence")]),e._v(" set to "),a("code",[e._v("TYPE_ID")]),e._v(".\nSee "),a("a",{attrs:{href:"#serdes"}},[e._v("Serialization, Deserialization, and Message Conversion")]),e._v(" for more information.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Due to some limitations in the way Spring resolves method arguments, a default "),a("code",[e._v("@KafkaHandler")]),e._v(" cannot receive discrete headers; it must use the "),a("code",[e._v("ConsumerRecordMetadata")]),e._v(" as discussed in "),a("a",{attrs:{href:"#consumer-record-metadata"}},[e._v("Consumer Record Metadata")]),e._v(".")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("For example:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("@KafkaHandler(isDefault = true)\npublic void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {\n ...\n}\n")])])]),a("p",[e._v("This won’t work if the object is a "),a("code",[e._v("String")]),e._v("; the "),a("code",[e._v("topic")]),e._v(" parameter will also get a reference to "),a("code",[e._v("object")]),e._v(".")]),e._v(" "),a("p",[e._v("If you need metadata about the record in a default method, use this:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("@KafkaHandler(isDefault = true)\nvoid listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {\n String topic = meta.topic();\n ...\n}\n")])])]),a("h5",{attrs:{id:"kafkalistener-attribute-modification"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#kafkalistener-attribute-modification"}},[e._v("#")]),e._v(" "),a("code",[e._v("@KafkaListener")]),e._v(" Attribute Modification")]),e._v(" "),a("p",[e._v("Starting with version 2.7.2, you can now programmatically modify annotation attributes before the container is created.\nTo do so, add one or more "),a("code",[e._v("KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer")]),e._v(" to the application context."),a("code",[e._v("AnnotationEnhancer")]),e._v(" is a "),a("code",[e._v("BiFunction