sendAndReceive(Message> message);
RequestReplyTypedMessageFuture sendAndReceive(Message> message,
ParameterizedTypeReference returnType);
```
这些将使用模板的默认`replyTimeout`,也有重载版本可以在方法调用中占用超时时间。
如果使用者的`Deserializer`或模板的`MessageConverter`可以通过配置或在回复消息中键入元数据来转换有效负载,而不需要任何其他信息,请使用第一种方法。
如果需要为返回类型提供类型信息,请使用第二种方法来帮助消息转换器。这还允许相同的模板接收不同的类型,即使在答复中没有类型元数据,例如当服务器端不是 Spring 应用程序时也是如此。以下是后者的一个例子:
例 6.模板 Bean
Java
```
@Bean
ReplyingKafkaTemplate template(
ProducerFactory pf,
ConcurrentKafkaListenerContainerFactory factory) {
ConcurrentMessageListenerContainer replyContainer =
factory.createContainer("replies");
replyContainer.getContainerProperties().setGroupId("request.replies");
ReplyingKafkaTemplate template =
new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setMessageConverter(new ByteArrayJsonMessageConverter());
template.setDefaultTopic("requests");
return template;
}
```
Kotlin
```
@Bean
fun template(
pf: ProducerFactory?,
factory: ConcurrentKafkaListenerContainerFactory
): ReplyingKafkaTemplate {
val replyContainer = factory.createContainer("replies")
replyContainer.containerProperties.groupId = "request.replies"
val template = ReplyingKafkaTemplate(pf, replyContainer)
template.messageConverter = ByteArrayJsonMessageConverter()
template.defaultTopic = "requests"
return template
}
```
例 7.使用模板
Java
```
RequestReplyTypedMessageFuture future1 =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
new ParameterizedTypeReference() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());
RequestReplyTypedMessageFuture> future2 =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
new ParameterizedTypeReference>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
```
Kotlin
```
val future1: RequestReplyTypedMessageFuture? =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
object : ParameterizedTypeReference() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())
val future2: RequestReplyTypedMessageFuture?>? =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
object : ParameterizedTypeReference?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })
```
##### 回复类型消息 \\>
当`@KafkaListener`返回`Message>`时,在版本为 2.5 之前的情况下,需要填充回复主题和相关 ID 头。在本例中,我们使用请求中的回复主题标头:
```
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
```
这也显示了如何在回复记录上设置一个键。
从版本 2.5 开始,该框架将检测这些标题是否丢失,并用主题填充它们-从`@SendTo`值确定的主题或传入的`KafkaHeaders.REPLY_TOPIC`标题(如果存在)。如果存在,它还将响应传入的`KafkaHeaders.CORRELATION_ID`和`KafkaHeaders.REPLY_PARTITION`。
```
@KafkaListener(id = "requestor", topics = "request")
@SendTo // default REPLY_TOPIC header
public Message> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.build();
}
```
##### 聚合多个回复
[使用`ReplyingKafkaTemplate`](#replying-template)中的模板严格用于单个请求/回复场景。对于单个消息的多个接收者返回答复的情况,可以使用`AggregatingReplyingKafkaTemplate`。这是[散-集 Enterprise 集成模式](https://www.enterpriseintegrationpatterns.com/patterns/messaging/BroadcastAggregate.html)客户端的一个实现。
与`ReplyingKafkaTemplate`类似,`AggregatingReplyingKafkaTemplate`构造函数需要一个生产者工厂和一个侦听器容器来接收回复;它有第三个参数`BiPredicate>, Boolean> releaseStrategy`,在每次接收到回复时都会查询这个参数;当谓词返回`true`时,`ConsumerRecord`s 的集合用于完成由`sendAndReceive`方法返回的`Future`。
还有一个额外的属性`returnPartialOnTimeout`(默认为 false)。当这被设置为`true`时,而不是用`KafkaReplyTimeoutException`来完成 future,部分结果通常会完成 future(只要至少收到了一条回复记录)。
从版本 2.3.5 开始,在超时之后也调用谓词(如果`returnPartialOnTimeout`是`true`)。第一个参数是当前的记录列表;第二个参数是`true`,如果这个调用是由于超时引起的。谓词可以修改记录列表。
```
AggregatingReplyingKafkaTemplate template =
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
coll -> coll.size() == releaseSize);
...
RequestReplyFuture>> future =
template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord>> consumerRecord =
future.get(30, TimeUnit.SECONDS);
```
请注意,返回类型是`ConsumerRecord`,其值是`ConsumerRecord`s 的集合。该“外”`ConsumerRecord`不是一个“真实”的记录,它是由模板合成的,作为实际接收到的回复记录的持有者用于请求。当正常的发布发生时(Release Strategy 返回 true),主题设置为`aggregatedResults`;如果`returnPartialOnTimeout`为真,并且发生超时(并且至少收到了一条回复记录),主题设置为`partialResultsAfterTimeout`。模板为这些“主题”名称提供了常量静态变量:
```
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a normal release by the release strategy.
*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a timeout.
*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
```
在`Collection`中,真正的`ConsumerRecord`包含接收答复的实际主题。
| |回复的侦听器容器必须配置为`AckMode.MANUAL`或`AckMode.MANUAL_IMMEDIATE`;消费者属性`enable.auto.commit`必须是`false`(自版本 2.3 以来的默认设置)。
为了避免丢失消息的可能性,模板仅在未完成请求为零的情况下提交偏移,即当发布策略发布最后一个未完成的请求时。
在重新平衡之后,有可能出现重复的回复发送;对于任何飞行中的请求,这些将被忽略;对于已经发布的回复,当收到重复的回复时,你可能会看到错误日志消息。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| |如果使用[`ErrorHandlingDeserializer`](#error-handling-deserializer)与此聚合模板,框架将不会自动检测`DeserializationException`s.
相反,记录(带有`null`值)将原封不动地返回,使用头文件中的反序列化异常。
建议应用程序调用实用程序方法`ReplyingKafkaTemplate.checkDeserialization()`方法来确定如果发生反序列化异常。
有关更多信息,请参见其 Javadocs。
此聚合模板也不会调用`replyErrorChecker`;你应该对回复的每个元素执行检查。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
#### 4.1.4.接收消息
可以通过配置`MessageListenerContainer`并提供消息侦听器或使用`@KafkaListener`注释来接收消息。
##### 消息侦听器
当使用[消息侦听器容器](#message-listener-container)时,必须提供一个侦听器来接收数据。目前,消息侦听器有八个受支持的接口。下面的清单展示了这些接口:
```
public interface MessageListener { (1)
void onMessage(ConsumerRecord data);
}
public interface AcknowledgingMessageListener { (2)
void onMessage(ConsumerRecord data, Acknowledgment acknowledgment);
}
public interface ConsumerAwareMessageListener extends MessageListener { (3)
void onMessage(ConsumerRecord data, Consumer, ?> consumer);
}
public interface AcknowledgingConsumerAwareMessageListener extends MessageListener { (4)
void onMessage(ConsumerRecord data, Acknowledgment acknowledgment, Consumer, ?> consumer);
}
public interface BatchMessageListener { (5)
void onMessage(List> data);
}
public interface BatchAcknowledgingMessageListener { (6)
void onMessage(List> data, Acknowledgment acknowledgment);
}
public interface BatchConsumerAwareMessageListener extends BatchMessageListener { (7)
void onMessage(List> data, Consumer, ?> consumer);
}
public interface BatchAcknowledgingConsumerAwareMessageListener extends BatchMessageListener { (8)
void onMessage(List> data, Acknowledgment acknowledgment, Consumer, ?> consumer);
}
```
|**1**|当使用自动提交或容器管理的[提交方法](#committing-offsets)操作时,使用此接口处理从 Kafka 使用者`poll()`接收的单个`ConsumerRecord`实例。|
|-----|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2**|在使用[提交方法](#committing-offsets)中的一种手动操作时,使用此接口处理从 Kafka 使用者`poll()`接收到的单个`ConsumerRecord`实例。|
|**3**|当使用自动提交或容器管理的[提交方法](#committing-offsets)中的一个操作时,使用此接口处理从 Kafka 使用者`ConsumerRecord`接收的单个`ConsumerRecord`实例。
提供了对`Consumer`对象的访问。|
|**4**|使用此接口处理从 Kafka 使用者`ConsumerRecord`接收到的单个`poll()`实例时使用的手动[提交方法](#committing-offsets)中的一个操作。
提供了对`Consumer`对象的访问。|
|**5**|当使用自动提交或容器管理的[提交方法](#committing-offsets)操作时,使用此接口处理从 Kafka 使用者`poll()`接收到的所有`ConsumerRecord`实例。当你使用此接口时,不支持`AckMode.RECORD`,因为给了侦听器完整的批处理。|
|**6**|使用此接口处理从 Kafka 使用者`ConsumerRecord`接收到的所有`poll()`实例时,使用其中一个手动[提交方法](#committing-offsets)操作。|
|**7**|在使用自动提交或容器管理的[提交方法](#committing-offsets)操作时,使用此接口处理从 Kafka 使用者`ConsumerRecord`接收的所有`poll()`实例,当你使用此接口时,不支持`AckMode.RECORD`,因为给了侦听器完整的批处理。
提供了对`Consumer`对象的访问。|
|**8**|使用此接口处理从 Kafka 使用者`ConsumerRecord`接收到的所有`poll()`实例,当使用其中一个手动[提交方法](#committing-offsets)操作时。
提供了对`Consumer`对象的访问。|
| |`Consumer`对象不是线程安全的。
你必须仅在调用侦听器的线程上调用它的方法。|
|---|---------------------------------------------------------------------------------------------------------------------|
| |你不应该执行任何`Consumer, ?>`方法,这些方法会影响用户在监听器中的位置和或提交偏移;容器需要管理这些信息。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
##### 消息侦听器容器
提供了两个`MessageListenerContainer`实现:
* `KafkaMessageListenerContainer`
* `ConcurrentMessageListenerContainer`
`KafkaMessageListenerContainer`接收来自单个线程上所有主题或分区的所有消息。`ConcurrentMessageListenerContainer`将委托给一个或多个`KafkaMessageListenerContainer`实例,以提供多线程消耗。
从版本 2.2.7 开始,你可以将`RecordInterceptor`添加到侦听器容器;在调用侦听器允许检查或修改记录之前,将调用它。如果拦截器返回 null,则不调用侦听器。从版本 2.7 开始,它有额外的方法,在侦听器退出后调用这些方法(通常是通过抛出异常)。此外,从版本 2.7 开始,现在有一个`BatchInterceptor`,为[批处理侦听器](#batch-listeners)提供类似的功能。此外,`ConsumerAwareRecordInterceptor`(和`BatchInterceptor`)提供对`Consumer, ?>`的访问。例如,这可以用来访问拦截器中的消费者指标。
| |你不应该在这些拦截器中执行任何影响使用者位置或提交偏移的方法;容器需要管理这些信息。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
`CompositeRecordInterceptor`和`CompositeBatchInterceptor`可用于调用多个拦截器。
默认情况下,从版本 2.8 开始,当使用事务时,拦截器在事务启动之前被调用。你可以将侦听器容器的`interceptBeforeTx`属性设置为`false`,以便在事务启动后调用拦截器。
从版本 2.3.8、2.4.6 开始,当并发性大于 1 时,`ConcurrentMessageListenerContainer`现在支持[静态成员](https://kafka.apache.org/documentation/#static_membership)。`group.instance.id`后缀为`-n`,后缀为`n`,起始于`1`。这与增加的`session.timeout.ms`一起,可以用来减少重新平衡事件,例如,当应用程序实例重新启动时。
###### 使用`KafkaMessageListenerContainer`
以下构造函数可用:
```
public KafkaMessageListenerContainer(ConsumerFactory consumerFactory,
ContainerProperties containerProperties)
```
它在`ContainerProperties`对象中接收`ConsumerFactory`和有关主题和分区以及其他配置的信息。`ContainerProperties`具有以下构造函数:
```
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
```
第一个构造函数接受一个由`TopicPartitionOffset`参数组成的数组,以显式地指示容器使用哪些分区(使用 Consumer`assign()`方法),并使用一个可选的初始偏移量。在默认情况下,正值是绝对的偏移量。在默认情况下,负值是相对于分区中当前的最后一个偏移量的。为`TopicPartitionOffset`提供了一个构造函数,它接受一个额外的`boolean`参数。如果这是`true`,则初始偏移(正或负)相对于此消费者的当前位置。当容器启动时,将应用这些偏移量。第二个是一个主题数组,Kafka 基于`group.id`属性(在整个组中分发分区)分配分区。第三种使用 regex`Pattern`来选择主题。
要将`MessageListener`分配给容器,可以在创建容器时使用`ContainerProps.setMessageListener`方法。下面的示例展示了如何做到这一点:
```
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener() {
...
});
DefaultKafkaConsumerFactory cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
```
请注意,当创建`DefaultKafkaConsumerFactory`时,使用只接收上述属性的构造函数意味着从配置中提取键和值`Deserializer`类。或者,`Deserializer`实例可以传递给`DefaultKafkaConsumerFactory`构造函数,用于键和/或值,在这种情况下,所有消费者共享相同的实例。另一种选择是提供`Supplier`s(从版本 2.3 开始),用于为每个`Consumer`获取单独的`Deserializer`实例:
```
DefaultKafkaConsumerFactory cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
```
有关可以设置的各种属性的更多信息,请参见[Javadoc](https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.html)for`ContainerProperties`。
自版本 2.1.1 以来,一个名为`logContainerConfig`的新属性可用。当启用`true`和`INFO`日志记录时,每个侦听器容器写一个日志消息,总结其配置属性。
默认情况下,主题偏移提交的日志记录是在`DEBUG`日志级别执行的。从版本 2.1.2 开始,`ContainerProperties`中的一个名为`commitLogLevel`的属性允许你为这些消息指定日志级别。例如,要将日志级别更改为`INFO`,可以使用`containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);`。
从版本 2.2 开始,添加了一个名为`missingTopicsFatal`的新容器属性(默认值:`false`自 2.3.4 起)。如果代理上不存在任何已配置的主题,这将阻止容器启动。如果容器被配置为侦听主题模式(regex),则不会应用该选项。以前,容器线程在`consumer.poll()`方法中循环运行,等待在记录许多消息时出现主题。除了日志之外,没有迹象表明存在问题。
从版本 2.8 开始,引入了一个新的容器属性`authExceptionRetryInterval`。这将导致容器在从`KafkaConsumer`获取任何`AuthenticationException`或`AuthorizationException`后重试获取消息。例如,当被配置的用户被拒绝读取某个主题或凭据不正确时,就会发生这种情况。定义`authExceptionRetryInterval`允许容器在授予适当权限时恢复。
| |默认情况下,不会配置间隔——身份验证和授权错误被认为是致命的,这会导致容器停止。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------|
从版本 2.8 开始,在创建消费者工厂时,如果你将反序列化器作为对象(在构造函数中或通过 setter)提供,工厂将调用`configure()`方法来使用配置属性对它们进行配置。
###### 使用`ConcurrentMessageListenerContainer`
单个构造函数类似于`KafkaListenerContainer`构造函数。下面的清单显示了构造函数的签名:
```
public ConcurrentMessageListenerContainer(ConsumerFactory consumerFactory,
ContainerProperties containerProperties)
```
它还具有`concurrency`属性。例如,`container.setConcurrency(3)`创建了三个`KafkaMessageListenerContainer`实例。
对于第一个构造函数,Kafka 使用其组管理功能在消费者之间分配分区。
| |当监听多个主题时,默认的分区分布可能不是你期望的那样,
例如,如果你有三个主题,每个主题有五个分区,并且希望使用`concurrency=15`,那么你只会看到五个活动的使用者,每个使用者从每个主题分配一个分区,
这是因为默认的 Kafka`PartitionAssignor`是`RangeAssignor`(参见其 Javadoc)。
对于这种情况,你可能想要考虑使用`RoundRobinAssignor`代替,它将分区分布在所有的消费者之间。,每个使用者被分配一个主题或分区。
要更改`PartitionAssignor`,可以将`partition.assignment.strategy`消费者属性(`ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG`)中提供的属性设置为
在使用 Spring 引导时,可以将策略设置为:
r=“723”/>消费者属性。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
当容器属性配置为`TopicPartitionOffset`s 时,`ConcurrentMessageListenerContainer`将`TopicPartitionOffset`实例分布在委托`KafkaMessageListenerContainer`实例中。
假设提供了六个`TopicPartitionOffset`实例,并且`concurrency`是`3`;每个容器都有两个分区。对于五个`TopicPartitionOffset`实例,两个容器获得两个分区,第三个容器获得一个分区。如果`concurrency`大于`TopicPartitions`的个数,则对`concurrency`进行向下调整,以便每个容器获得一个分区。
| |`client.id`属性(如果设置)以`-n`附加,其中`n`是对应于并发性的消费者实例。
这是在启用 JMX 时为 MBean 提供唯一名称所必需的。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
从版本 1.3 开始,`MessageListenerContainer`提供对底层`KafkaConsumer`的度量的访问。在`ConcurrentMessageListenerContainer`的情况下,`metrics()`方法返回所有目标`KafkaMessageListenerContainer`实例的度量。度量值由为底层`KafkaConsumer`提供的`client-id`分组为`Map`。
从版本 2.3 开始,`ContainerProperties`提供了一个`idleBetweenPolls`选项,让侦听器容器中的主循环在`KafkaConsumer.poll()`调用之间休眠。从所提供的选项和`max.poll.interval.ms`消费者配置和当前记录批处理时间之间的差值中选择一个实际的睡眠间隔作为最小值。
###### 提交偏移
为提交偏移提供了几个选项。如果`enable.auto.commit`消费者属性是`true`,Kafka 将根据其配置自动提交偏移。如果是`false`,则容器支持几个`AckMode`设置(在下一个列表中进行了描述)。默认的`AckMode`是`BATCH`。从版本 2.3 开始,该框架将`enable.auto.commit`设置为`false`,除非在配置中明确设置。以前,如果未设置属性,则使用 Kafka 默认值(`true`)。
消费者`poll()`方法返回一个或多个`ConsumerRecords`。为每个记录调用`MessageListener`。下面的列表描述了容器为每个`AckMode`(不使用事务时)所采取的操作:
* `RECORD`:在侦听器在处理完记录后返回时提交偏移量。
* `BATCH`:在处理完`poll()`返回的所有记录后提交偏移量。
* `TIME`:在`poll()`返回的所有记录都已被处理的情况下提交偏移量,只要`ackTime`自上次提交以来的偏移量已被超过。
* `COUNT`:提交当`poll()`返回的所有记录都已被处理时的偏移量,只要`ackCount`记录自上次提交以来一直被接收。
* `COUNT_TIME`:类似于`TIME`和`COUNT`,但如果任一条件是`true`,则执行提交。
* `MANUAL`:消息侦听器负责`acknowledge()`的`Acknowledgment`。在此之后,将应用与`BATCH`相同的语义。
* `MANUAL_IMMEDIATE`:当侦听器调用`Acknowledgment.acknowledge()`方法时,立即提交偏移量。
当使用[交易](#transactions)时,偏移量被发送到事务,语义等价于`RECORD`或`BATCH`,这取决于侦听器类型(记录或批处理)。
| |`MANUAL`和`MANUAL_IMMEDIATE`要求侦听器是`AcknowledgingMessageListener`或`BatchAcknowledgingMessageListener`。
参见[消息侦听器](#message-listeners)。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
根据`syncCommits`容器属性,将使用消费者上的`commitSync()`或`commitAsync()`方法。`syncCommits`默认情况下是`true`;还请参见`setSyncCommitTimeout`。参见`setCommitCallback`以获取异步提交的结果;默认的回调是`LoggingCommitCallback`,它记录错误(和调试级别的成功)。
因为侦听器容器有自己的提交偏移的机制,所以它更喜欢 kafka`ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG`为`false`。从版本 2.3 开始,它无条件地将其设置为 false,除非在消费者工厂或容器的消费者属性重写中专门设置了 false。
`Acknowledgment`具有以下方法:
```
public interface Acknowledgment {
void acknowledge();
}
```
此方法使侦听器能够控制何时提交偏移。
从版本 2.3 开始,`Acknowledgment`接口有两个额外的方法`nack(long sleep)`和`nack(int index, long sleep)`。第一个用于记录侦听器,第二个用于批处理侦听器。为侦听器类型调用错误的方法将抛出`IllegalStateException`。
| |如果要使用`nack()`提交部分批处理,则在使用事务时,将`AckMode`设置为`MANUAL`;调用`nack()`将成功处理的记录的偏移量发送到事务。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| |`nack()`只能在调用侦听器的使用者线程上调用。|
|---|------------------------------------------------------------------------------|
对于记录侦听器,当调用`nack()`时,将提交任何挂起的偏移量,丢弃上一次轮询的重置记录,并在它们的分区上执行查找,以便在下一次`poll()`上重新交付失败的记录和未处理的记录。通过设置`sleep`参数,可以在重新交付之前暂停使用者线程。这类似于当容器配置为`DefaultErrorHandler`时抛出异常的功能。
使用批处理侦听器时,可以在发生故障的批处理中指定索引。当调用`nack()`时,将对记录提交偏移,然后在分区上对失败和丢弃的记录执行索引和查找,以便在下一个`poll()`上重新交付它们。
有关更多信息,请参见[容器错误处理程序](#error-handlers)。
| |当通过组管理使用分区分配时,重要的是要确保`sleep`参数(加上处理来自上一次投票的记录所花费的时间)小于消费者`max.poll.interval.ms`属性。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
###### 侦听器容器自动启动
侦听器容器实现`SmartLifecycle`,而`autoStartup`默认情况下是`true`。容器在后期启动(`Integer.MAX-VALUE - 100`)。实现`SmartLifecycle`以处理来自侦听器的数据的其他组件应该在较早的阶段启动。`- 100`为后面的阶段留出了空间,以使组件能够在容器之后自动启动。
##### 手动提交偏移
通常,当使用`AckMode.MANUAL`或`AckMode.MANUAL_IMMEDIATE`时,必须按顺序确认确认,因为 Kafka 不为每个记录维护状态,只为每个组/分区维护一个提交的偏移量。从版本 2.8 开始,你现在可以设置容器属性`asyncAcks`,它允许以任何顺序确认投票返回的记录的确认。侦听器容器将推迟顺序外的提交,直到收到缺少的确认。消费者将被暂停(没有新的记录交付),直到前一次投票的所有补偿都已提交。
| |虽然该特性允许应用程序异步处理记录,但应该理解的是,它增加了在发生故障后重复交付的可能性。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
##### `@KafkaListener`注释
`@KafkaListener`注释用于指定 Bean 方法作为侦听器容器的侦听器。 Bean 包装在`MessagingMessageListenerAdapter`中配置有各种特征,例如转换器来转换数据,如果需要,以匹配该方法的参数。
可以使用`#{…}`或属性占位符(`${…}`)使用 SPEL 配置注释上的大多数属性。有关更多信息,请参见[Javadoc](https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html)。
###### 记录收听者
`@KafkaListener`注释为简单的 POJO 侦听器提供了一种机制。下面的示例展示了如何使用它:
```
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
```
这种机制需要在你的`@Configuration`类中的一个上进行`@EnableKafka`注释,并需要一个侦听器容器工厂,该工厂用于配置底层`ConcurrentMessageListenerContainer`。在缺省情况下,一个名称`kafkaListenerContainerFactory`的 Bean 是期望的。下面的示例展示了如何使用`ConcurrentMessageListenerContainer`:
```
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map consumerConfigs() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
```
注意,要设置容器属性,必须在工厂上使用`getContainerProperties()`方法。它被用作注入到容器中的实际属性的模板。
从版本 2.1.1 开始,你现在可以为由注释创建的消费者设置`client.id`属性。`clientIdPrefix`后缀为`-n`,其中`n`是表示使用并发性时容器号的整数。
从版本 2.2 开始,你现在可以通过在注释本身上使用属性来覆盖容器工厂的`concurrency`和`autoStartup`属性。这些属性可以是简单值、属性占位符或 SPEL 表达式。下面的示例展示了如何做到这一点:
```
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
```
###### 显式分区分配
你还可以使用显式的主题和分区(以及它们的初始偏移量)来配置 POJO 侦听器。下面的示例展示了如何做到这一点:
```
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord, ?> record) {
...
}
```
你可以在`partitions`或`partitionOffsets`属性中指定每个分区,但不能同时指定这两个分区。
与大多数注释属性一样,你可以使用 SPEL 表达式;有关如何生成一个大的分区列表的示例,请参见[[tip-assign-all-parts]。
从版本 2.5.5 开始,你可以对所有分配的分区应用初始偏移量:
```
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord, ?> record) {
...
}
```
`*`通配符表示`partitions`属性中的所有分区。每个`@TopicPartition`中必须只有一个带有通配符的`@PartitionOffset`。
此外,当侦听器实现`ConsumerSeekAware`时,现在调用`onPartitionsAssigned`,即使在使用手动分配时也是如此。例如,这允许在那个时候进行任意的查找操作。
从版本 2.6.4 开始,你可以指定一个以逗号分隔的分区列表,或分区范围:
```
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
```
范围是包含的;上面的示例将分配分区`0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15`。
在指定初始偏移量时可以使用相同的技术:
```
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord, ?> record) {
...
}
```
初始偏移量将应用于所有 6 个分区。
###### 手动确认
当使用 Manual`AckMode`时,还可以向监听器提供`Acknowledgment`。下面的示例还展示了如何使用不同的容器工厂。
```
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
```
###### 消费者记录元数据
最后,关于记录的元数据可以从消息头获得。你可以使用以下头名称来检索消息的头:
* `KafkaHeaders.OFFSET`
* `KafkaHeaders.RECEIVED_MESSAGE_KEY`
* `KafkaHeaders.RECEIVED_TOPIC`
* `KafkaHeaders.RECEIVED_PARTITION_ID`
* `KafkaHeaders.RECEIVED_TIMESTAMP`
* `KafkaHeaders.TIMESTAMP_TYPE`
从版本 2.5 开始,如果传入的记录具有`null`键,则不存在`RECEIVED_MESSAGE_KEY`;以前,头被填充为`null`值。此更改是为了使框架与`spring-messaging`约定保持一致,其中不存在`null`值标头。
下面的示例展示了如何使用标题:
```
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
```
从版本 2.5 开始,你可以在`ConsumerRecordMetadata`参数中接收记录元数据,而不是使用离散的头。
```
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
```
这包含来自`ConsumerRecord`的所有数据,除了键和值。
###### 批处理侦听器
从版本 1.1 开始,你可以配置`@KafkaListener`方法来接收从消费者投票中接收到的整批消费者记录。要将侦听器容器工厂配置为创建批处理侦听器,你可以设置`batchListener`属性。下面的示例展示了如何做到这一点:
```
@Bean
public KafkaListenerContainerFactory, ?> batchFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}
```
| |从版本 2.8 开始,你可以使用`@KafkaListener`注释上的`batch`属性重写工厂的`batchListener`Propery。
这一点以及对[容器错误处理程序](#error-handlers)的更改允许对记录和批处理侦听器使用相同的工厂。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
下面的示例展示了如何接收有效载荷列表:
```
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List list) {
...
}
```
主题、分区、偏移量等在与有效负载并行的标题中可用。下面的示例展示了如何使用标题:
```
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List topics,
@Header(KafkaHeaders.OFFSET) List offsets) {
...
}
```
或者,可以接收一个`List`的`Message>`对象与每个偏移和每个消息中的其他详细信息,但是它必须是在方法上定义的唯一参数(除了可选的`Acknowledgment`,当使用手动提交时,和/或`Consumer, ?>`参数)。下面的示例展示了如何做到这一点:
```
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List> list, Acknowledgment ack, Consumer, ?> consumer) {
...
}
```
在这种情况下,不对有效负载执行任何转换。
如果`BatchMessagingMessageConverter`被配置为`RecordMessageConverter`,那么你还可以向`Message`参数添加一个泛型类型,然后对有效负载进行转换。有关更多信息,请参见[使用批处理侦听器的有效负载转换](#payload-conversion-with-batch)。
你还可以接收`ConsumerRecord, ?>`对象的列表,但它必须是方法上定义的唯一参数(除了可选的`Acknowledgment`,当使用手动提交和`Consumer, ?>`参数时)。下面的示例展示了如何做到这一点:
```
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List> list, Acknowledgment ack) {
...
}
```
从版本 2.2 开始,侦听器可以接收由`poll()`方法返回的完整`ConsumerRecords, ?>`对象,让侦听器访问其他方法,例如`partitions()`(它返回列表中的`TopicPartition`实例)和`records(TopicPartition)`(它获得选择性记录)。同样,这必须是方法上唯一的参数(除了可选的`Acknowledgment`,当使用手动提交或`Consumer, ?>`参数时)。下面的示例展示了如何做到这一点:
```
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords, ?> records) {
...
}
```
| |如果容器工厂配置了`RecordFilterStrategy`,则对于`ConsumerRecords, ?>`侦听器将忽略它,并发出`WARN`日志消息。
如果使用`>`形式的侦听器,则只能使用批侦听器过滤记录。默认情况下,
,记录是一次过滤一次的;从版本 2.8 开始,你可以覆盖`filterBatch`以在一个调用中过滤整个批处理。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
###### 注释属性
从版本 2.0 开始,`id`属性(如果存在)被用作 Kafka Consumer`group.id`属性,如果存在,则覆盖 Consumer 工厂中的配置属性。还可以显式地将`groupId`设置为`idIsGroup`,也可以将`idIsGroup`设置为 false,以恢复以前使用消费者工厂`group.id`的行为。
你可以在大多数注释属性中使用属性占位符或 SPEL 表达式,如下例所示:
```
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
```
从版本 2.1.2 开始,SPEL 表达式支持一个特殊的令牌:`__listener`。它是一个伪 Bean 名称,表示存在此注释的当前 Bean 实例。
考虑以下示例:
```
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
```
考虑到前面示例中的 bean,我们可以使用以下方法:
```
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
```
如果在不太可能的情况下,你有一个实际的 Bean 名为`__listener`,那么你可以使用`beanRef`属性来更改表达式标记。下面的示例展示了如何做到这一点:
```
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}",
groupId = "#{__x.topic}.group")
```
从版本 2.2.4 开始,你可以直接在注释中指定 Kafka 消费者属性,这些属性将覆盖在消费者工厂中配置的具有相同名称的任何属性。以这种方式指定**不能**和`client.id`属性;它们将被忽略;对这些属性使用`groupId`和`clientIdPrefix`注释属性。
这些属性被指定为具有普通 Java`Properties`文件格式的单个字符串:`foo:bar`,`foo=bar`,或`foo bar`。
```
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
```
下面是[使用`RoutingKafkaTemplate`](#routing-template)示例中的相应侦听器的示例。
```
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}
```
##### 获取消费者`group.id`
当在多个容器中运行相同的侦听器代码时,能够确定记录来自哪个容器(由其`group.id`消费者属性标识)可能是有用的。
你可以在侦听器线程上调用`KafkaUtils.getConsumerGroupId()`来执行此操作。或者,你可以访问方法参数中的组 ID。
```
@KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}", exposeGroupId = "${always:true}")
public void listener(@Payload String foo,
@Header(KafkaHeaders.GROUP_ID) String groupId) {
...
}
```
| |这在接收`List>`记录的记录侦听器和批处理侦听器中可用。**不是**在接收`ConsumerRecords, ?>`参数的批处理侦听器中可用。
在这种情况下使用`KafkaUtils`机制。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
##### 容器线程命名
侦听器容器当前使用两个任务执行器,一个用于调用使用者,另一个用于在 Kafka 消费者属性`enable.auto.commit`为`false`时调用侦听器。你可以通过设置容器的`consumerExecutor`和`listenerExecutor`属性来提供自定义执行器。当使用池执行程序时,确保有足够多的线程可用来处理使用它们的所有容器之间的并发性。当使用`ConcurrentMessageListenerContainer`时,来自每个使用者的线程都用于每个使用者(`concurrency`)。
如果不提供消费者执行器,则使用`SimpleAsyncTaskExecutor`。此执行器创建名称与`-C-1`(使用者线程)类似的线程。对于`ConcurrentMessageListenerContainer`,线程名称的``部分变成`-m`,其中`m`表示消费者实例。`n`每次启动容器时都会增加。所以,具有 Bean 名称的`container`,此容器中的线程将被命名为`container-0-C-1`、`container-1-C-1`等,在容器被第一次启动之后;`container-0-C-2`、`container-1-C-2`等,在停止之后又被随后的启动。
##### `@KafkaListener`作为元注释
从版本 2.2 开始,你现在可以使用`@KafkaListener`作为元注释。下面的示例展示了如何做到这一点:
```
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
public @interface MyThreeConsumersListener {
@AliasFor(annotation = KafkaListener.class, attribute = "id")
String id();
@AliasFor(annotation = KafkaListener.class, attribute = "topics")
String[] topics();
@AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
String concurrency() default "3";
}
```
你必须至少别名`topics`、`topicPattern`或`topicPartitions`中的一个(并且,通常是`id`或`groupId`,除非你在消费者工厂配置中指定了`group.id`)。下面的示例展示了如何做到这一点:
```
@MyThreeConsumersListener(id = "my.group", topics = "my.topic")
public void listen1(String in) {
...
}
```
##### 在类上`@KafkaListener`
在类级别上使用`@KafkaListener`时,必须在方法级别上指定`@KafkaHandler`。在发送消息时,将使用转换后的消息有效负载类型来确定调用哪个方法。下面的示例展示了如何做到这一点:
```
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
```
从版本 2.1.3 开始,你可以将`@KafkaHandler`方法指定为默认方法,如果其他方法不匹配,则调用该方法。最多只能指定一种方法。当使用`@KafkaHandler`方法时,有效负载必须已经转换为域对象(因此可以执行匹配)。使用自定义的反序列化器,`JsonDeserializer`,或`JsonMessageConverter`,其`TypePrecedence`设置为`TYPE_ID`。有关更多信息,请参见[序列化、反序列化和消息转换](#serdes)。
| |由于 Spring 解析方法参数的方式的某些限制,默认的`@KafkaHandler`不能接收离散的头;它必须使用`ConsumerRecordMetadata`中讨论的[消费者记录元数据](#consumer-record-metadata)。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
例如:
```
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
```
如果对象是`String`,这将不起作用;`topic`参数还将获得对`object`的引用。
如果在默认方法中需要有关记录的元数据,请使用以下方法:
```
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}
```
##### `topic`属性修改
从版本 2.7.2 开始,你现在可以在创建容器之前以编程方式修改注释属性。为此,将一个或多个`KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer`添加到应用程序上下文。`AnnotationEnhancer`是一个`BiFunction