# Apache Kafka 支持
## Apache Kafka 支持
### 概述
Spring Apache Kafka 的集成基于[Spring for Apache Kafka project](https://projects.spring.io/spring-kafka/)。
你需要在项目中包含此依赖项:
Maven
```
org.springframework.integration
spring-integration-kafka
5.5.9
```
Gradle
```
compile "org.springframework.integration:spring-integration-kafka:5.5.9"
```
它提供了以下组件:
* [出站通道适配器](#kafka-outbound)
* [消息驱动通道适配器](#kafka-inbound)
* [入站通道适配器](#kafka-inbound-pollable)
* [出站网关](#kafka-outbound-gateway)
* [入站网关](#kafka-inbound-gateway)
* [Apache Kafka 主题支持的频道](#kafka-channels)
### 出站通道适配器
出站通道适配器用于将消息从 Spring 集成通道发布到 Apache Kafka 主题。通道在应用程序上下文中定义,然后连接到将消息发送到 Apache Kafka 的应用程序中。发送方应用程序可以通过使用 Spring 集成消息发布到 Apache Kafka,这些消息由出站通道适配器在内部转换为 Kafka 记录,如下所示:
* Spring 集成消息的有效负载用于填充 Kafka 记录的有效负载。
* 默认情况下, Spring 集成消息的`kafka_messageKey`头用于填充 Kafka 记录的键。
你可以分别通过`kafka_topic`和`kafka_partitionId`标题定制用于发布消息的目标主题和分区。
此外,``提供了通过在出站消息上应用 SPEL 表达式来提取密钥、目标主题和目标分区的能力。为此,它支持三对相互排斥的属性:
* `topic`和`topic-expression`
* `message-key`和`message-key-expression`
* `partition-id`和`partition-id-expression`
它们允许你分别将`topic`、`message-key`和`partition-id`指定为适配器上的静态值,或者在运行时根据请求消息动态计算它们的值。
| |`KafkaHeaders`接口(由`spring-kafka`提供)包含用于与
头部交互的常量。
`messageKey`和`topic`默认头部现在需要一个`kafka_`前缀。
从使用旧头部的早期版本迁移时,你需要在``上指定`message-key-expression="headers['messageKey']"`和`topic-expression="headers['topic']"`。
或者,你可以通过使用``或`MessageBuilder`将上游的头更改为新的头,如果你使用常量值,你还可以使用`topic`和`message-key`在适配器上配置它们。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
注意:如果适配器配置了主题或消息键(使用常量或表达式),则使用这些选项,并忽略相应的头。如果希望标头覆盖配置,则需要在表达式中对其进行配置,如以下所示:
```
topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"
```
适配器需要`KafkaTemplate`,而这又需要适当配置的`KafkaProducerFactory`。
如果提供了`send-failure-channel`(`sendFailureChannel`)并接收到发送失败(同步或异步),则将向通道发送`ErrorMessage`。有效负载是一个`KafkaSendFailureException`,带有`failedMessage`,`record`(the`ProducerRecord`)和`cause`属性。通过设置`error-message-strategy`属性,可以覆盖`DefaultErrorMessageStrategy`。
如果提供了`send-success-channel`(`sendSuccessChannel`),则在成功发送后将发送有效负载类型为`org.apache.kafka.clients.producer.RecordMetadata`的消息。
| |如果你的应用程序使用事务,并且在由侦听器容器启动事务的地方使用相同的通道适配器来发布消息,以及在不存在事务的地方发布消息,你必须在`KafkaTemplate`上配置`transactionIdPrefix`以覆盖容器或事务管理器使用的前缀。
容器发起的事务(生产者工厂或事务管理器属性)使用的前缀必须在所有应用程序实例上都相同。
使用的前缀对于仅用于生产者的事务,必须在所有应用程序实例上都是唯一的。|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
你可以配置一个`flushExpression`,它必须解析为布尔值。如果你使用`linger.ms`和`batch.size`Kafka Producer 属性,则在发送多条消息后进行刷新可能会很有用;在最后一条消息上,表达式应该计算为`Boolean.TRUE`,并且将立即发送一个不完整的批处理。默认情况下,表达式在`KafkaIntegrationHeaders.FLUSH`报头(`kafka_flush`)中查找`Boolean`值。如果值`true`而不是`false`或者不存在标头,就会发生刷新。
`KafkaProducerMessageHandler.sendTimeoutExpression`默认值已从 10 秒更改为`delivery.timeout.ms`Kafka Producer 属性`+ 5000`,以便将超时后的实际 Kafka 错误传播到应用程序,而不是由此框架生成的超时。为了保持一致性,对此进行了更改,因为你可能会遇到意外的行为( Spring 可能会超时发送,而实际上最终是成功的)。重要提示:默认情况下,超时时间为 120 秒,因此你可能希望减少超时时间,以获得更多的及时故障。
#### Java 配置
下面的示例展示了如何使用 Java 为 Apache Kafka 配置出站通道适配器:
```
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("someTopic"));
handler.setMessageKeyExpression(new LiteralExpression("someKey"));
handler.setSuccessChannel(successes());
handler.setFailureChannel(failures());
return handler;
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory producerFactory() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaProducerFactory<>(props);
}
```
#### Java DSL 配置
下面的示例展示了如何使用 Spring 集成 Java DSL 为 Apache Kafka 配置出站通道适配器:
```
@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}
@Bean
public IntegrationFlow sendToKafkaFlow() {
return f -> f
.split(p -> Stream.generate(() -> p).limit(101).iterator(), null)
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
.timestampExpression("T(Long).valueOf('1487694048633')"),
e -> e.id("kafkaProducer1")))
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
.timestamp(m -> 1487694048644L),
e -> e.id("kafkaProducer2")))
);
}
@Bean
public DefaultKafkaHeaderMapper mapper() {
return new DefaultKafkaHeaderMapper();
}
private KafkaProducerMessageHandlerSpec kafkaMessageHandler(
ProducerFactory producerFactory, String topic) {
return Kafka
.outboundChannelAdapter(producerFactory)
.messageKey(m -> m
.getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.headerMapper(mapper())
.partitionId(m -> 10)
.topicExpression("headers[kafka_topic] ?: '" + topic + "'")
.configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
```
#### XML 配置
下面的示例展示了如何使用 XML 配置 Kafka 出站通道适配器:
```
```
### 消息驱动通道适配器
`KafkaMessageDrivenChannelAdapter`(``)使用`spring-kafka``KafkaMessageListenerContainer`或`ConcurrentListenerContainer`。
而且`mode`属性也是可用的。它可以接受`record`或`batch`的值(默认:`record`)。对于`record`模式,每个消息有效负载都是从单个`ConsumerRecord`转换而来的。对于`batch`模式,有效负载是由消费者投票返回的所有`ConsumerRecord`实例转换的对象列表。与批处理的`@KafkaListener`一样,`KafkaHeaders.RECEIVED_MESSAGE_KEY`、`KafkaHeaders.RECEIVED_PARTITION_ID`、`KafkaHeaders.RECEIVED_TOPIC`和`KafkaHeaders.OFFSET`标题也是列表,其位置与有效载荷中的位置相对应。
接收到的消息有特定的标题填充。有关更多信息,请参见[`KafkaHeaders`类](https://DOCS. Spring.io/ Spring-kafka/api/org/springframework/kafka/support/kafkaheaders.html)。
| |`Consumer`对象(在`kafka_consumer`标头中)不是线程安全的。
你必须仅在调用适配器中的侦听器的线程上调用它的方法。
如果将消息传递给另一个线程,则不能调用它的方法。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
当提供了`retry-template`时,将根据其重试策略重试交付失败。在这种情况下,`error-channel`是不允许的。当重试用完时,可以使用`recovery-callback`来处理错误。在大多数情况下,这是一个`ErrorMessageSendingRecoverer`,它将`ErrorMessage`发送到一个通道。
在构建`ErrorMessage`(用于`error-channel`或`recovery-callback`)时,可以通过设置`error-message-strategy`属性来定制错误消息。默认情况下,将使用`RawRecordHeaderErrorMessageStrategy`,以提供对转换后的消息以及原始`ConsumerRecord`的访问。
#### Java 配置
下面的示例展示了如何使用 Java 配置消息驱动的通道适配器:
```
@Bean
public KafkaMessageDrivenChannelAdapter
adapter(KafkaMessageListenerContainer container) {
KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public KafkaMessageListenerContainer container() throws Exception {
ContainerProperties properties = new ContainerProperties(this.topic);
// set more properties
return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}
@Bean
public ConsumerFactory consumerFactory() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaConsumerFactory<>(props);
}
```
#### Java DSL 配置
下面的示例展示了如何使用 Spring 集成 Java DSL 配置消息驱动通道适配器:
```
@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow() {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
.configureListenerContainer(c ->
c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
.id("topic1ListenerContainer"))
.recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
new RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(new RetryTemplate())
.filterInRetry(true))
.filter(Message.class, m ->
m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
f -> f.throwExceptionOnRejection(true))
.transform(String::toUpperCase)
.channel(c -> c.queue("listeningFromKafkaResults1"))
.get();
}
```
还可以使用用于`@KafkaListener`注释的容器工厂为其他目的创建`ConcurrentMessageListenerContainer`实例。有关示例,请参见[the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/)。
对于 Java DSL,容器不必配置为`@Bean`,因为 DSL 将容器注册为 Bean。下面的示例展示了如何做到这一点:
```
@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
KafkaMessageDrivenChannelAdapter.ListenerMode.record)
.id("topic2Adapter"))
...
get();
}
```
注意,在这种情况下,适配器被赋予`id`(`topic2Adapter`)。容器以`topic2Adapter.container`的名称注册在应用程序上下文中。如果适配器没有`id`属性,则容器的 Bean 名称是容器的完全限定类名称加上`#n`,其中`n`对每个容器递增。
#### XML 配置
下面的示例展示了如何使用 XML 配置消息驱动的通道适配器:
```
```
### 入站通道适配器
`KafkaMessageSource`提供了一个可匹配的通道适配器实现。
#### Java 配置
```
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
@Bean
public KafkaMessageSource source(ConsumerFactory cf) {
KafkaMessageSource source = new KafkaMessageSource<>(cf, "myTopic");
source.setGroupId("myGroupId");
source.setClientId("myClientId");
return source;
}
```
请参考 Javadocs 以获得可用的属性。
默认情况下,`max.poll.records`必须在消费工厂中显式设置,或者如果消费工厂是`DefaultKafkaConsumerFactory`,则将强制设置为 1。可以将属性`allowMultiFetch`设置为`true`以覆盖此行为。
| |你必须在`max.poll.interval.ms`内轮询消费者以避免重新平衡。
如果你将`allowMultiFetch`设置为`true`,你必须处理所有检索到的记录,并在`max.poll.interval.ms`内再次轮询。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
此适配器发出的消息包含一个标题`kafka_remainingRecords`,其中包含上一个轮询中剩余的记录的计数。
#### Java DSL 配置
```
@Bean
public IntegrationFlow flow(ConsumerFactory cf) {
return IntegrationFlows.from(Kafka.inboundChannelAdapter(cf, "myTopic")
.groupId("myDslGroupId"), e -> e.poller(Pollers.fixedDelay(5000)))
.handle(System.out::println)
.get();
}
```
#### XML 配置
```
```
### 出站网关
出站网关用于请求/回复操作。它与大多数 Spring 集成网关的不同之处在于,发送线程不会在网关中阻塞,而应答将在应答侦听器容器线程上进行处理。如果你的代码调用同步[消息传递网关](https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#gateway)后面的网关,则用户线程将在此阻塞,直到收到答复(或者发生超时)。
| |网关不接受请求,直到应答容器分配了它的主题和分区。
建议你在模板的应答容器属性中添加`ConsumerRebalanceListener`,并在向网关发送消息之前等待`onPartitionsAssigned`调用。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
`KafkaProducerMessageHandler``sendTimeoutExpression`默认值是`delivery.timeout.ms`Kafka Producer 属性`+ 5000`,这样超时后的实际 Kafka 错误将传播到应用程序,而不是由该框架生成的超时。为了保持一致性,对此进行了更改,因为你可能会遇到意外的行为( Spring 可能会超时发送,而实际上最终是成功的)。重要提示:默认情况下,超时时间为 120 秒,因此你可能希望减少超时时间,以获得更多的及时故障。
#### Java 配置
下面的示例展示了如何使用 Java 配置网关:
```
@Bean
@ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies")
public KafkaProducerMessageHandler outGateway(
ReplyingKafkaTemplate kafkaTemplate) {
return new KafkaProducerMessageHandler<>(kafkaTemplate);
}
```
请参考 Javadocs 以获得可用的属性。
请注意,使用了与[出站通道适配器](#kafka-outbound)相同的类,唯一的区别是传递到构造函数的`KafkaTemplate`是`ReplyingKafkaTemplate`。有关更多信息,请参见[the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/)。
出站主题、分区、键等都是以与出站适配器相同的方式确定的。答复题目确定如下:
1. 一个名为`KafkaHeaders.REPLY_TOPIC`的消息头(如果存在,它必须具有`String`或`byte[]`值)将根据模板的回复容器的订阅主题进行验证。
2. 如果模板的`replyContainer`仅订阅了一个主题,则使用该主题。
还可以指定`KafkaHeaders.REPLY_PARTITION`头,以确定用于答复的特定分区。同样,这是根据模板的回复容器的订阅进行验证的。
#### Java DSL 配置
下面的示例展示了如何使用 Java DSL 配置出站网关:
```
@Bean
public IntegrationFlow outboundGateFlow(
ReplyingKafkaTemplate kafkaTemplate) {
return IntegrationFlows.from("kafkaRequests")
.handle(Kafka.outboundGateway(kafkaTemplate))
.channel("kafkaReplies")
.get();
}
```
或者,也可以使用类似于以下 Bean 的配置:
```
@Bean
public IntegrationFlow outboundGateFlow() {
return IntegrationFlows.from("kafkaRequests")
.handle(Kafka.outboundGateway(producerFactory(), replyContainer())
.configureKafkaTemplate(t -> t.replyTimeout(30_000)))
.channel("kafkaReplies")
.get();
}
```
#### XML 配置
```
```
### 入站网关
入站网关用于请求/回复操作。
下面的示例展示了如何使用 Java 配置入站网关:
```
@Bean
public KafkaInboundGateway inboundGateway(
AbstractMessageListenerContainercontainer,
KafkaTemplate replyTemplate) {
KafkaInboundGateway gateway =
new KafkaInboundGateway<>(container, replyTemplate);
gateway.setRequestChannel(requests);
gateway.setReplyChannel(replies);
gateway.setReplyTimeout(30_000);
return gateway;
}
```
请参考 Javadocs 以获得可用的属性。
下面的示例展示了如何使用 Java DSL 配置一个简单的大写转换器:
```
@Bean
public IntegrationFlow serverGateway(
ConcurrentMessageListenerContainer container,
KafkaTemplate replyTemplate) {
return IntegrationFlows
.from(Kafka.inboundGateway(container, replyTemplate)
.replyTimeout(30_000))
.transform(String::toUpperCase)
.get();
}
```
或者,你可以通过使用类似于以下代码来配置大写转换器:
```
@Bean
public IntegrationFlow serverGateway() {
return IntegrationFlows
.from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
producerFactory())
.replyTimeout(30_000))
.transform(String::toUpperCase)
.get();
}
```
还可以使用用于`@KafkaListener`注释的容器工厂为其他目的创建`ConcurrentMessageListenerContainer`实例。有关示例,请参见[the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/)和[消息驱动通道适配器](#kafka-inbound)。
#### XML 配置
```
```
有关每个属性的描述,请参见 XML 模式。
### 由 Apache Kafka 主题支持的通道
Spring 集成具有`MessageChannel`实现,该实现由用于持久性的 Apache Kafka 主题支持。
每个通道都需要一个`KafkaTemplate`用于发送端,或者一个侦听器容器工厂(用于可订阅通道),或者一个`KafkaMessageSource`用于可检索通道。
#### Java DSL 配置
```
@Bean
public IntegrationFlow flowWithSubscribable(KafkaTemplate template,
ConcurrentKafkaListenerContainerFactory containerFactory) {
return IntegrationFlows.from(...)
...
.channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1"))
...
.get();
}
@Bean
public IntegrationFlow flowWithPubSub(KafkaTemplate template,
ConcurrentKafkaListenerContainerFactory containerFactory) {
return IntegrationFlows.from(...)
...
.publishSubscribeChannel(pubSub(template, containerFactory),
pubsub -> pubsub
.subscribe(subflow -> ...)
.subscribe(subflow -> ...))
.get();
}
@Bean
public BroadcastCapableChannel pubSub(KafkaTemplate template,
ConcurrentKafkaListenerContainerFactory containerFactory) {
return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2")
.groupId("group2")
.get();
}
@Bean
public IntegrationFlow flowWithPollable(KafkaTemplate template,
KafkaMessageSource source) {
return IntegrationFlows.from(...)
...
.channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3"))
.handle(..., e -> e.poller(...))
...
.get();
}
```
#### Java 配置
```
/**
* Channel for a single subscriber.
**/
@Bean
SubscribableKafkaChannel pointToPoint(KafkaTemplate template,
KafkaListenerContainerFactory factory)
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(template, factory, "topicA");
channel.setGroupId("group1");
return channel;
}
/**
* Channel for multiple subscribers.
**/
@Bean
SubscribableKafkaChannel pubsub(KafkaTemplate template,
KafkaListenerContainerFactory factory)
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(template, factory, "topicB", true);
channel.setGroupId("group2");
return channel;
}
/**
* Pollable channel (topic is configured on the source)
**/
@Bean
PollableKafkaChannel pollable(KafkaTemplate template,
KafkaMessageSource source)
PollableKafkaChannel channel =
new PollableKafkaChannel(template, source);
channel.setGroupId("group3");
return channel;
}
```
#### XML 配置
```
```
### 消息转换
a`StringJsonMessageConverter`。有关更多信息,请参见[the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/)。
当将此转换器与消息驱动通道适配器一起使用时,你可以指定要将传入的有效负载转换为哪种类型。这是通过在适配器上设置`payload-type`属性(`payloadType`属性)来实现的。下面的示例展示了如何在 XML 配置中实现这一点:
```
```
下面的示例展示了如何在 Java 配置中设置适配器上的`payload-type`属性(`payloadType`属性):
```
@Bean
public KafkaMessageDrivenChannelAdapter
adapter(KafkaMessageListenerContainer container) {
KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
kafkaMessageDrivenChannelAdapter.setMessageConverter(converter());
kafkaMessageDrivenChannelAdapter.setPayloadType(Foo.class);
return kafkaMessageDrivenChannelAdapter;
}
```
### 空有效载荷和日志压缩“墓碑”记录
Spring 消息传递`Message>`对象不能具有`null`有效负载。当你使用 Apache Kafka 的端点时,`null`有效载荷(也称为 Tombstone 记录)由类型为`KafkaNull`的有效载荷表示。有关更多信息,请参见[the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/)。
Spring 集成端点的 POJO 方法可以使用真正的`null`值,而不是`KafkaNull`值。要这样做,用`@Payload(required = false)`标记参数。下面的示例展示了如何做到这一点:
```
@ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint")
public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Payload(required = false) Customer customer) {
// customer is null if a tombstone record
...
}
```
### 从`KStream`调用 Spring 集成流
你可以使用`MessagingTransformer`从`KStream`调用集成流:
```
@Bean
public KStream kStream(StreamsBuilder kStreamBuilder,
MessagingTransformer transformer) transformer) {
KStream stream = kStreamBuilder.stream(STREAMING_TOPIC1);
stream.mapValues((ValueMapper) String::toUpperCase)
...
.transform(() -> transformer)
.to(streamingTopic2);
stream.print(Printed.toSysOut());
return stream;
}
@Bean
@DependsOn("flow")
public MessagingTransformer transformer(
MessagingFunction function) {
MessagingMessageConverter converter = new MessagingMessageConverter();
converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
return new MessagingTransformer<>(function, converter);
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(MessagingFunction.class)
...
.get();
}
```
当一个集成流从一个接口开始时,所创建的代理具有该流的名称 Bean,并附加了“.gateway”,因此如果需要,这个 Bean 名称可以使用 a`@Qualifier`。
### 读/处理/写场景的性能注意事项
许多应用程序使用一个主题,执行一些处理并写入另一个主题。在大多数情况下,如果写入失败,应用程序将希望抛出一个异常,以便可以重试传入的请求并/或将其发送到一个死信主题。该功能由底层消息侦听器容器以及适当配置的错误处理程序支持。然而,为了支持这一点,我们需要阻塞侦听器线程,直到写操作成功(或失败),以便可以将任何异常抛到容器中。当使用单个记录时,可以通过在出站适配器上设置`sync`属性来实现。然而,当使用批处理时,使用`sync`会导致显著的性能下降,因为应用程序将在发送下一条消息之前等待每次发送的结果。你还可以执行多个发送,然后等待这些发送的结果。这是通过向消息处理程序添加`futuresChannel`来实现的。要启用该功能,请将`KafkaIntegrationHeaders.FUTURE_TOKEN`添加到出站消息中;然后可以使用此功能将`Future`关联到特定的已发送消息。下面是一个如何使用此功能的示例:
```
@SpringBootApplication
public class FuturesChannelApplication {
public static void main(String[] args) {
SpringApplication.run(FuturesChannelApplication.class, args);
}
@Bean
IntegrationFlow inbound(ConsumerFactory consumerFactory, Handler handler) {
return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory,
ListenerMode.batch, "inTopic"))
.handle(handler)
.get();
}
@Bean
IntegrationFlow outbound(KafkaTemplate kafkaTemplate) {
return IntegrationFlows.from(Gate.class)
.enrichHeaders(h -> h
.header(KafkaHeaders.TOPIC, "outTopic")
.headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]"))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.futuresChannel("futures"))
.get();
}
@Bean
PollableChannel futures() {
return new QueueChannel();
}
}
@Component
@DependsOn("outbound")
class Handler {
@Autowired
Gate gate;
@Autowired
PollableChannel futures;
public void handle(List input) throws Exception {
System.out.println(input);
input.forEach(str -> this.gate.send(str.toUpperCase()));
for (int i = 0; i < input.size(); i++) {
Message> future = this.futures.receive(10000);
((Future>) future.getPayload()).get(10, TimeUnit.SECONDS);
}
}
}
interface Gate {
void send(String out);
}
```