# Apache Kafka Support ## [](#kafka)Apache Kafka Support ### [](#overview)Overview Spring Integration for Apache Kafka is based on the [Spring for Apache Kafka project](https://projects.spring.io/spring-kafka/). You need to include this dependency into your project: Maven ``` org.springframework.integration spring-integration-kafka 5.5.9 ``` Gradle ``` compile "org.springframework.integration:spring-integration-kafka:5.5.9" ``` It provides the following components: * [Outbound Channel Adapter](#kafka-outbound) * [Message-driven Channel Adapter](#kafka-inbound) * [Inbound Channel Adapter](#kafka-inbound-pollable) * [Outbound Gateway](#kafka-outbound-gateway) * [Inbound Gateway](#kafka-inbound-gateway) * [Channels Backed by Apache Kafka Topics](#kafka-channels) ### [](#kafka-outbound)Outbound Channel Adapter The Outbound channel adapter is used to publish messages from a Spring Integration channel to Apache Kafka topics. The channel is defined in the application context and then wired into the application that sends messages to Apache Kafka. Sender applications can publish to Apache Kafka by using Spring Integration messages, which are internally converted to Kafka records by the outbound channel adapter, as follows: * The payload of the Spring Integration message is used to populate the payload of the Kafka record. * By default, the `kafka_messageKey` header of the Spring Integration message is used to populate the key of the Kafka record. You can customize the target topic and partition for publishing the message through the `kafka_topic` and `kafka_partitionId` headers, respectively. In addition, the `` provides the ability to extract the key, target topic, and target partition by applying SpEL expressions on the outbound message. To that end, it supports three mutually exclusive pairs of attributes: * `topic` and `topic-expression` * `message-key` and `message-key-expression` * `partition-id` and `partition-id-expression` These let you specify `topic`, `message-key`, and `partition-id`, respectively, as static values on the adapter or to dynamically evaluate their values at runtime against the request message. | |The `KafkaHeaders` interface (provided by `spring-kafka`) contains constants used for interacting with
headers.
The `messageKey` and `topic` default headers now require a `kafka_` prefix.
When migrating from an earlier version that used the old headers, you need to specify `message-key-expression="headers['messageKey']"` and `topic-expression="headers['topic']"` on the ``.
Alternatively, you can change the headers upstream to the new headers from `KafkaHeaders` by using a `` or a `MessageBuilder`.
If you use constant values, you can also configure them on the adapter by using `topic` and `message-key`.| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| NOTE : If the adapter is configured with a topic or message key (either with a constant or expression), those are used and the corresponding header is ignored. If you wish the header to override the configuration, you need to configure it in an expression, such as the following: ``` topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'" ``` The adapter requires a `KafkaTemplate`, which, in turn, requires a suitably configured `KafkaProducerFactory`. If a `send-failure-channel` (`sendFailureChannel`) is provided and a send failure (sync or async) is received, an `ErrorMessage` is sent to the channel. The payload is a `KafkaSendFailureException` with `failedMessage`, `record` (the `ProducerRecord`) and `cause` properties. You can override the `DefaultErrorMessageStrategy` by setting the `error-message-strategy` property. If a `send-success-channel` (`sendSuccessChannel`) is provided, a message with a payload of type `org.apache.kafka.clients.producer.RecordMetadata` is sent after a successful send. | |If your application uses transactions and the same channel adapter is used to publish messages where the transaction is started by a listener container, as well as publishing where there is no existing transaction, you must configure a `transactionIdPrefix` on the `KafkaTemplate` to override the prefix used by the container or transaction manager.
The prefix used by container-initiated transactions (the producer factory or transaction manager property) must be the same on all application instances.
The prefix used for producer-only transactions must be unique on all application instances.| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| You can configure a `flushExpression` which must resolve to a boolean value. Flushing after sending several messages might be useful if you are using the `linger.ms` and `batch.size` Kafka producer properties; the expression should evaluate to `Boolean.TRUE` on the last message and an incomplete batch will be sent immediately. By default, the expression looks for a `Boolean` value in the `KafkaIntegrationHeaders.FLUSH` header (`kafka_flush`). The flush will occur if the value is `true` and not if it’s `false` or the header is absent. The `KafkaProducerMessageHandler.sendTimeoutExpression` default has changed from 10 seconds to the `delivery.timeout.ms` Kafka producer property `+ 5000` so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework. This has been changed for consistency because you may get unexpected behavior (Spring may timeout the send, while it is actually, eventually, successful). IMPORTANT: That timeout is 120 seconds by default so you may wish to reduce it to get more timely failures. #### [](#java-configuration)Java Configuration The following example shows how to configure the outbound channel adapter for Apache Kafka with Java: ``` @Bean @ServiceActivator(inputChannel = "toKafka") public MessageHandler handler() throws Exception { KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler<>(kafkaTemplate()); handler.setTopicExpression(new LiteralExpression("someTopic")); handler.setMessageKeyExpression(new LiteralExpression("someKey")); handler.setSuccessChannel(successes()); handler.setFailureChannel(failures()); return handler; } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public ProducerFactory producerFactory() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); // set more properties return new DefaultKafkaProducerFactory<>(props); } ``` #### [](#java-dsl-configuration)Java DSL Configuration The following example shows how to configure the outbound channel adapter for Apache Kafka with Spring Integration Java DSL: ``` @Bean public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka)); } @Bean public IntegrationFlow sendToKafkaFlow() { return f -> f .split(p -> Stream.generate(() -> p).limit(101).iterator(), null) .publishSubscribeChannel(c -> c .subscribe(sf -> sf.handle( kafkaMessageHandler(producerFactory(), TEST_TOPIC1) .timestampExpression("T(Long).valueOf('1487694048633')"), e -> e.id("kafkaProducer1"))) .subscribe(sf -> sf.handle( kafkaMessageHandler(producerFactory(), TEST_TOPIC2) .timestamp(m -> 1487694048644L), e -> e.id("kafkaProducer2"))) ); } @Bean public DefaultKafkaHeaderMapper mapper() { return new DefaultKafkaHeaderMapper(); } private KafkaProducerMessageHandlerSpec kafkaMessageHandler( ProducerFactory producerFactory, String topic) { return Kafka .outboundChannelAdapter(producerFactory) .messageKey(m -> m .getHeaders() .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)) .headerMapper(mapper()) .partitionId(m -> 10) .topicExpression("headers[kafka_topic] ?: '" + topic + "'") .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic)); } ``` #### [](#xml-configuration)XML Configuration The following example shows how to configure the Kafka outbound channel adapter with XML: ``` ... ``` ### [](#kafka-inbound)Message-driven Channel Adapter The `KafkaMessageDrivenChannelAdapter` (``) uses a `spring-kafka` `KafkaMessageListenerContainer` or `ConcurrentListenerContainer`. Also the `mode` attribute is available. It can accept values of `record` or `batch` (default: `record`). For `record` mode, each message payload is converted from a single `ConsumerRecord`. For `batch` mode, the payload is a list of objects that are converted from all the `ConsumerRecord` instances returned by the consumer poll. As with the batched `@KafkaListener`, the `KafkaHeaders.RECEIVED_MESSAGE_KEY`, `KafkaHeaders.RECEIVED_PARTITION_ID`, `KafkaHeaders.RECEIVED_TOPIC`, and `KafkaHeaders.OFFSET` headers are also lists, with positions corresponding to the position in the payload. Received messages have certain headers populated. See the [`KafkaHeaders` class](https://docs.spring.io/spring-kafka/api/org/springframework/kafka/support/KafkaHeaders.html) for more information. | |The `Consumer` object (in the `kafka_consumer` header) is not thread-safe.
You must invoke its methods only on the thread that calls the listener within the adapter.
If you hand off the message to another thread, you must not call its methods.| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| When a `retry-template` is provided, delivery failures are retried according to its retry policy. An `error-channel` is not allowed in this case. You can use the `recovery-callback` to handle the error when retries are exhausted. In most cases, this is an `ErrorMessageSendingRecoverer` that sends the `ErrorMessage` to a channel. When building an `ErrorMessage` (for use in the `error-channel` or `recovery-callback`), you can customize the error message by setting the `error-message-strategy` property. By default, a `RawRecordHeaderErrorMessageStrategy` is used, to provide access to the converted message as well as the raw `ConsumerRecord`. #### [](#java-configuration-2)Java Configuration The following example shows how to configure a message-driven channel adapter with Java: ``` @Bean public KafkaMessageDrivenChannelAdapter adapter(KafkaMessageListenerContainer container) { KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record); kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); return kafkaMessageDrivenChannelAdapter; } @Bean public KafkaMessageListenerContainer container() throws Exception { ContainerProperties properties = new ContainerProperties(this.topic); // set more properties return new KafkaMessageListenerContainer<>(consumerFactory(), properties); } @Bean public ConsumerFactory consumerFactory() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); // set more properties return new DefaultKafkaConsumerFactory<>(props); } ``` #### [](#java-dsl-configuration-2)Java DSL Configuration The following example shows how to configure a message-driven channel adapter with the Spring Integration Java DSL: ``` @Bean public IntegrationFlow topic1ListenerFromKafkaFlow() { return IntegrationFlows .from(Kafka.messageDrivenChannelAdapter(consumerFactory(), KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1) .configureListenerContainer(c -> c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL) .id("topic1ListenerContainer")) .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(), new RawRecordHeaderErrorMessageStrategy())) .retryTemplate(new RetryTemplate()) .filterInRetry(true)) .filter(Message.class, m -> m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101, f -> f.throwExceptionOnRejection(true)) .transform(String::toUpperCase) .channel(c -> c.queue("listeningFromKafkaResults1")) .get(); } ``` You can also use the container factory that is used for `@KafkaListener` annotations to create `ConcurrentMessageListenerContainer` instances for other purposes. See [the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/) for an example. With the Java DSL, the container does not have to be configured as a `@Bean`, because the DSL registers the container as a bean. The following example shows how to do so: ``` @Bean public IntegrationFlow topic2ListenerFromKafkaFlow() { return IntegrationFlows .from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2), KafkaMessageDrivenChannelAdapter.ListenerMode.record) .id("topic2Adapter")) ... get(); } ``` Notice that, in this case, the adapter is given an `id` (`topic2Adapter`). The container is registered in the application context with a name of `topic2Adapter.container`. If the adapter does not have an `id` property, the container’s bean name is the container’s fully qualified class name plus `#n`, where `n` is incremented for each container. #### [](#xml-configuration-2)XML Configuration The following example shows how to configure a message-driven channel adapter with XML: ``` ... ``` ### [](#kafka-inbound-pollable)Inbound Channel Adapter The `KafkaMessageSource` provides a pollable channel adapter implementation. #### [](#java-configuration-3)Java Configuration ``` @InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000")) @Bean public KafkaMessageSource source(ConsumerFactory cf) { KafkaMessageSource source = new KafkaMessageSource<>(cf, "myTopic"); source.setGroupId("myGroupId"); source.setClientId("myClientId"); return source; } ``` Refer to the javadocs for available properties. By default, `max.poll.records` must be either explicitly set in the consumer factory, or it will be forced to 1 if the consumer factory is a `DefaultKafkaConsumerFactory`. You can set the property `allowMultiFetch` to `true` to override this behavior. | |You must poll the consumer within `max.poll.interval.ms` to avoid a rebalance.
If you set `allowMultiFetch` to `true` you must process all the retrieved records, and poll again, within `max.poll.interval.ms`.| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| Messages emitted by this adapter contain a header `kafka_remainingRecords` with a count of records remaining from the previous poll. #### [](#java-dsl-configuration-3)Java DSL Configuration ``` @Bean public IntegrationFlow flow(ConsumerFactory cf) { return IntegrationFlows.from(Kafka.inboundChannelAdapter(cf, "myTopic") .groupId("myDslGroupId"), e -> e.poller(Pollers.fixedDelay(5000))) .handle(System.out::println) .get(); } ``` #### [](#xml-configuration-3)XML Configuration ``` ``` ### [](#kafka-outbound-gateway)Outbound Gateway The outbound gateway is for request/reply operations. It differs from most Spring Integration gateways in that the sending thread does not block in the gateway and the reply is processed on the reply listener container thread. If your code invokes the gateway behind a synchronous [Messaging Gateway](https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#gateway), the user thread blocks there until the reply is received (or a timeout occurs). | |The gateway does not accept requests until the reply container has been assigned its topics and partitions.
It is suggested that you add a `ConsumerRebalanceListener` to the template’s reply container properties and wait for the `onPartitionsAssigned` call before sending messages to the gateway.| |---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| The `KafkaProducerMessageHandler` `sendTimeoutExpression` default is `delivery.timeout.ms` Kafka producer property `+ 5000` so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework. This has been changed for consistency because you may get unexpected behavior (Spring may timeout the send, while it is actually, eventually, successful). IMPORTANT: That timeout is 120 seconds by default so you may wish to reduce it to get more timely failures. #### [](#java-configuration-4)Java Configuration The following example shows how to configure a gateway with Java: ``` @Bean @ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies") public KafkaProducerMessageHandler outGateway( ReplyingKafkaTemplate kafkaTemplate) { return new KafkaProducerMessageHandler<>(kafkaTemplate); } ``` Refer to the javadocs for available properties. Notice that the same class as the [outbound channel adapter](#kafka-outbound) is used, the only difference being that the `KafkaTemplate` passed into the constructor is a `ReplyingKafkaTemplate`. See [the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/) for more information. The outbound topic, partition, key, and so on are determined in the same way as the outbound adapter. The reply topic is determined as follows: 1. A message header named `KafkaHeaders.REPLY_TOPIC` (if present, it must have a `String` or `byte[]` value) is validated against the template’s reply container’s subscribed topics. 2. If the template’s `replyContainer` is subscribed to only one topic, it is used. You can also specify a `KafkaHeaders.REPLY_PARTITION` header to determine a specific partition to be used for replies. Again, this is validated against the template’s reply container’s subscriptions. #### [](#java-dsl-configuration-4)Java DSL Configuration The following example shows how to configure an outbound gateway with the Java DSL: ``` @Bean public IntegrationFlow outboundGateFlow( ReplyingKafkaTemplate kafkaTemplate) { return IntegrationFlows.from("kafkaRequests") .handle(Kafka.outboundGateway(kafkaTemplate)) .channel("kafkaReplies") .get(); } ``` Alternatively, you can also use a configuration similar to the following bean: ``` @Bean public IntegrationFlow outboundGateFlow() { return IntegrationFlows.from("kafkaRequests") .handle(Kafka.outboundGateway(producerFactory(), replyContainer()) .configureKafkaTemplate(t -> t.replyTimeout(30_000))) .channel("kafkaReplies") .get(); } ``` #### [](#xml-configuration-4)XML Configuration ``` ``` ### [](#kafka-inbound-gateway)Inbound Gateway The inbound gateway is for request/reply operations. The following example shows how to configure an inbound gateway with Java: ``` @Bean public KafkaInboundGateway inboundGateway( AbstractMessageListenerContainercontainer, KafkaTemplate replyTemplate) { KafkaInboundGateway gateway = new KafkaInboundGateway<>(container, replyTemplate); gateway.setRequestChannel(requests); gateway.setReplyChannel(replies); gateway.setReplyTimeout(30_000); return gateway; } ``` Refer to the javadocs for available properties. The following example shows how to configure a simple upper case converter with the Java DSL: ``` @Bean public IntegrationFlow serverGateway( ConcurrentMessageListenerContainer container, KafkaTemplate replyTemplate) { return IntegrationFlows .from(Kafka.inboundGateway(container, replyTemplate) .replyTimeout(30_000)) .transform(String::toUpperCase) .get(); } ``` Alternatively, you could configure an upper-case converter by using code similar to the following: ``` @Bean public IntegrationFlow serverGateway() { return IntegrationFlows .from(Kafka.inboundGateway(consumerFactory(), containerProperties(), producerFactory()) .replyTimeout(30_000)) .transform(String::toUpperCase) .get(); } ``` You can also use the container factory that is used for `@KafkaListener` annotations to create `ConcurrentMessageListenerContainer` instances for other purposes. See [the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/) and [Message-driven Channel Adapter](#kafka-inbound) for examples. #### [](#xml-configuration-5)XML Configuration ``` ``` See the XML schema for a description of each property. ### [](#kafka-channels)Channels Backed by Apache Kafka Topics Spring Integration has `MessageChannel` implementations backed by an Apache Kafka topic for persistence. Each channel requires a `KafkaTemplate` for the sending side and either a listener container factory (for subscribable channels) or a `KafkaMessageSource` for a pollable channel. #### [](#java-dsl-configuration-5)Java DSL Configuration ``` @Bean public IntegrationFlow flowWithSubscribable(KafkaTemplate template, ConcurrentKafkaListenerContainerFactory containerFactory) { return IntegrationFlows.from(...) ... .channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1")) ... .get(); } @Bean public IntegrationFlow flowWithPubSub(KafkaTemplate template, ConcurrentKafkaListenerContainerFactory containerFactory) { return IntegrationFlows.from(...) ... .publishSubscribeChannel(pubSub(template, containerFactory), pubsub -> pubsub .subscribe(subflow -> ...) .subscribe(subflow -> ...)) .get(); } @Bean public BroadcastCapableChannel pubSub(KafkaTemplate template, ConcurrentKafkaListenerContainerFactory containerFactory) { return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2") .groupId("group2") .get(); } @Bean public IntegrationFlow flowWithPollable(KafkaTemplate template, KafkaMessageSource source) { return IntegrationFlows.from(...) ... .channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3")) .handle(..., e -> e.poller(...)) ... .get(); } ``` #### [](#java-configuration-5)Java Configuration ``` /** * Channel for a single subscriber. **/ @Bean SubscribableKafkaChannel pointToPoint(KafkaTemplate template, KafkaListenerContainerFactory factory) SubscribableKafkaChannel channel = new SubscribableKafkaChannel(template, factory, "topicA"); channel.setGroupId("group1"); return channel; } /** * Channel for multiple subscribers. **/ @Bean SubscribableKafkaChannel pubsub(KafkaTemplate template, KafkaListenerContainerFactory factory) SubscribableKafkaChannel channel = new SubscribableKafkaChannel(template, factory, "topicB", true); channel.setGroupId("group2"); return channel; } /** * Pollable channel (topic is configured on the source) **/ @Bean PollableKafkaChannel pollable(KafkaTemplate template, KafkaMessageSource source) PollableKafkaChannel channel = new PollableKafkaChannel(template, source); channel.setGroupId("group3"); return channel; } ``` #### [](#xml-configuration-6)XML Configuration ``` ``` ### [](#message-conversion)Message Conversion A `StringJsonMessageConverter` is provided. See [the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/) for more information. When using this converter with a message-driven channel adapter, you can specify the type to which you want the incoming payload to be converted. This is achieved by setting the `payload-type` attribute (`payloadType` property) on the adapter. The following example shows how to do so in XML configuration: ``` ``` The following example shows how to set the `payload-type` attribute (`payloadType` property) on the adapter in Java configuration: ``` @Bean public KafkaMessageDrivenChannelAdapter adapter(KafkaMessageListenerContainer container) { KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record); kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); kafkaMessageDrivenChannelAdapter.setMessageConverter(converter()); kafkaMessageDrivenChannelAdapter.setPayloadType(Foo.class); return kafkaMessageDrivenChannelAdapter; } ``` ### [](#kafka-tombstones)Null Payloads and Log Compaction 'Tombstone' Records Spring Messaging `Message` objects cannot have `null` payloads. When you use the endpoints for Apache Kafka, `null` payloads (also known as tombstone records) are represented by a payload of type `KafkaNull`. See See [the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/) for more information. The POJO methods for Spring Integration endpoints can use a true `null` value instead instead of `KafkaNull`. To do so, mark the parameter with `@Payload(required = false)`. The following example shows how to do so: ``` @ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint") public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, @Payload(required = false) Customer customer) { // customer is null if a tombstone record ... } ``` ### [](#streams-integration)Calling a Spring Integration flow from a `KStream` You can use a `MessagingTransformer` to invoke an integration flow from a `KStream`: ``` @Bean public KStream kStream(StreamsBuilder kStreamBuilder, MessagingTransformer transformer) transformer) { KStream stream = kStreamBuilder.stream(STREAMING_TOPIC1); stream.mapValues((ValueMapper) String::toUpperCase) ... .transform(() -> transformer) .to(streamingTopic2); stream.print(Printed.toSysOut()); return stream; } @Bean @DependsOn("flow") public MessagingTransformer transformer( MessagingFunction function) { MessagingMessageConverter converter = new MessagingMessageConverter(); converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*")); return new MessagingTransformer<>(function, converter); } @Bean public IntegrationFlow flow() { return IntegrationFlows.from(MessagingFunction.class) ... .get(); } ``` When an integration flow starts with an interface, the proxy that is created has the name of the flow bean, appended with ".gateway" so this bean name can be used a a `@Qualifier` if needed. ### [](#read-process-write)Performance Considerations for read/process/write Scenarios Many applications consume from a topic, perform some processing and write to another topic. In most, cases, if the write fails, the application would want to throw an exception so the incoming request can be retried and/or sent to a dead letter topic. This functionality is supported by the underlying message listener container, together with a suitably configured error handler. However, in order to support this, we need to block the listener thread until the success (or failure) of the write operation so that any exceptions can be thrown to the container. When consuming single records, this is achieved by setting the `sync` property on the outbound adapter. However, when consuming batches, using `sync` causes a significant performance degradation because the application would wait for the result of each send before sending the next message. You also can perform multiple sends and then wait for the results of those sends afterwards. This is achieved by adding a `futuresChannel` to the message handler. To enable the feature add `KafkaIntegrationHeaders.FUTURE_TOKEN` to the outbound messages; this can then be used to correlate a `Future` to a particular sent message. Here is an example of how you might use this feature: ``` @SpringBootApplication public class FuturesChannelApplication { public static void main(String[] args) { SpringApplication.run(FuturesChannelApplication.class, args); } @Bean IntegrationFlow inbound(ConsumerFactory consumerFactory, Handler handler) { return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory, ListenerMode.batch, "inTopic")) .handle(handler) .get(); } @Bean IntegrationFlow outbound(KafkaTemplate kafkaTemplate) { return IntegrationFlows.from(Gate.class) .enrichHeaders(h -> h .header(KafkaHeaders.TOPIC, "outTopic") .headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]")) .handle(Kafka.outboundChannelAdapter(kafkaTemplate) .futuresChannel("futures")) .get(); } @Bean PollableChannel futures() { return new QueueChannel(); } } @Component @DependsOn("outbound") class Handler { @Autowired Gate gate; @Autowired PollableChannel futures; public void handle(List input) throws Exception { System.out.println(input); input.forEach(str -> this.gate.send(str.toUpperCase())); for (int i = 0; i < input.size(); i++) { Message future = this.futures.receive(10000); ((Future) future.getPayload()).get(10, TimeUnit.SECONDS); } } } interface Gate { void send(String out); } ```