kafka.md 34.7 KB
Newer Older
dallascao's avatar
dallascao 已提交
1 2
# Apache Kafka 支持

3
## Apache Kafka 支持
dallascao's avatar
dallascao 已提交
4

5
### 概述
dallascao's avatar
dallascao 已提交
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40

Spring Apache Kafka 的集成基于[Spring for Apache Kafka project](https://projects.spring.io/spring-kafka/)

你需要在项目中包含此依赖项:

Maven

```
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>5.5.9</version>
</dependency>
```

Gradle

```
compile "org.springframework.integration:spring-integration-kafka:5.5.9"
```

它提供了以下组件:

* [出站通道适配器](#kafka-outbound)

* [消息驱动通道适配器](#kafka-inbound)

* [入站通道适配器](#kafka-inbound-pollable)

* [出站网关](#kafka-outbound-gateway)

* [入站网关](#kafka-inbound-gateway)

* [Apache Kafka 主题支持的频道](#kafka-channels)

41
### 出站通道适配器
dallascao's avatar
dallascao 已提交
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82

出站通道适配器用于将消息从 Spring 集成通道发布到 Apache Kafka 主题。通道在应用程序上下文中定义,然后连接到将消息发送到 Apache Kafka 的应用程序中。发送方应用程序可以通过使用 Spring 集成消息发布到 Apache Kafka,这些消息由出站通道适配器在内部转换为 Kafka 记录,如下所示:

* Spring 集成消息的有效负载用于填充 Kafka 记录的有效负载。

* 默认情况下, Spring 集成消息的`kafka_messageKey`头用于填充 Kafka 记录的键。

你可以分别通过`kafka_topic``kafka_partitionId`标题定制用于发布消息的目标主题和分区。

此外,`<int-kafka:outbound-channel-adapter>`提供了通过在出站消息上应用 SPEL 表达式来提取密钥、目标主题和目标分区的能力。为此,它支持三对相互排斥的属性:

* `topic``topic-expression`

* `message-key``message-key-expression`

* `partition-id``partition-id-expression`

它们允许你分别将`topic``message-key``partition-id`指定为适配器上的静态值,或者在运行时根据请求消息动态计算它们的值。

|   |`KafkaHeaders`接口(由`spring-kafka`提供)包含用于与<br/>头部交互的常量。<br/>`messageKey``topic`默认头部现在需要一个`kafka_`前缀。<br/>从使用旧头部的早期版本迁移时,你需要在`<int-kafka:outbound-channel-adapter>`上指定`message-key-expression="headers['messageKey']"``topic-expression="headers['topic']"`<br/>或者,你可以通过使用`<header-enricher>``MessageBuilder`将上游的头更改为新的头,如果你使用常量值,你还可以使用`topic``message-key`在适配器上配置它们。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

注意:如果适配器配置了主题或消息键(使用常量或表达式),则使用这些选项,并忽略相应的头。如果希望标头覆盖配置,则需要在表达式中对其进行配置,如以下所示:

```
topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"
```

适配器需要`KafkaTemplate`,而这又需要适当配置的`KafkaProducerFactory`

如果提供了`send-failure-channel``sendFailureChannel`)并接收到发送失败(同步或异步),则将向通道发送`ErrorMessage`。有效负载是一个`KafkaSendFailureException`,带有`failedMessage``record`(the`ProducerRecord`)和`cause`属性。通过设置`error-message-strategy`属性,可以覆盖`DefaultErrorMessageStrategy`

如果提供了`send-success-channel``sendSuccessChannel`),则在成功发送后将发送有效负载类型为`org.apache.kafka.clients.producer.RecordMetadata`的消息。

|   |如果你的应用程序使用事务,并且在由侦听器容器启动事务的地方使用相同的通道适配器来发布消息,以及在不存在事务的地方发布消息,你必须在`KafkaTemplate`上配置`transactionIdPrefix`以覆盖容器或事务管理器使用的前缀。<br/>容器发起的事务(生产者工厂或事务管理器属性)使用的前缀必须在所有应用程序实例上都相同。<br/>使用的前缀对于仅用于生产者的事务,必须在所有应用程序实例上都是唯一的。|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

你可以配置一个`flushExpression`,它必须解析为布尔值。如果你使用`linger.ms``batch.size`Kafka Producer 属性,则在发送多条消息后进行刷新可能会很有用;在最后一条消息上,表达式应该计算为`Boolean.TRUE`,并且将立即发送一个不完整的批处理。默认情况下,表达式在`KafkaIntegrationHeaders.FLUSH`报头(`kafka_flush`)中查找`Boolean`值。如果值`true`而不是`false`或者不存在标头,就会发生刷新。

`KafkaProducerMessageHandler.sendTimeoutExpression`默认值已从 10 秒更改为`delivery.timeout.ms`Kafka Producer 属性`+ 5000`,以便将超时后的实际 Kafka 错误传播到应用程序,而不是由此框架生成的超时。为了保持一致性,对此进行了更改,因为你可能会遇到意外的行为( Spring 可能会超时发送,而实际上最终是成功的)。重要提示:默认情况下,超时时间为 120 秒,因此你可能希望减少超时时间,以获得更多的及时故障。

83
#### Java 配置
dallascao's avatar
dallascao 已提交
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113

下面的示例展示了如何使用 Java 为 Apache Kafka 配置出站通道适配器:

```
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
    KafkaProducerMessageHandler<String, String> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setTopicExpression(new LiteralExpression("someTopic"));
    handler.setMessageKeyExpression(new LiteralExpression("someKey"));
    handler.setSuccessChannel(successes());
    handler.setFailureChannel(failures());
    return handler;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
    // set more properties
    return new DefaultKafkaProducerFactory<>(props);
}
```

114
#### Java DSL 配置
dallascao's avatar
dallascao 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158

下面的示例展示了如何使用 Spring 集成 Java DSL 为 Apache Kafka 配置出站通道适配器:

```
@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}

@Bean
public IntegrationFlow sendToKafkaFlow() {
    return f -> f
            .<String>split(p -> Stream.generate(() -> p).limit(101).iterator(), null)
            .publishSubscribeChannel(c -> c
                    .subscribe(sf -> sf.handle(
                            kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
                                    .timestampExpression("T(Long).valueOf('1487694048633')"),
                            e -> e.id("kafkaProducer1")))
                    .subscribe(sf -> sf.handle(
                            kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
                                   .timestamp(m -> 1487694048644L),
                            e -> e.id("kafkaProducer2")))
            );
}

@Bean
public DefaultKafkaHeaderMapper mapper() {
    return new DefaultKafkaHeaderMapper();
}

private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
        ProducerFactory<Integer, String> producerFactory, String topic) {
    return Kafka
            .outboundChannelAdapter(producerFactory)
            .messageKey(m -> m
                    .getHeaders()
                    .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
            .headerMapper(mapper())
            .partitionId(m -> 10)
            .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
            .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
```

159
#### XML 配置
dallascao's avatar
dallascao 已提交
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190

下面的示例展示了如何使用 XML 配置 Kafka 出站通道适配器:

```
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="template"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    topic="foo"
                                    sync="false"
                                    message-key-expression="'bar'"
                                    send-failure-channel="failures"
                                    send-success-channel="successes"
                                    error-message-strategy="ems"
                                    partition-id-expression="2">
</int-kafka:outbound-channel-adapter>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    ... <!-- more producer properties -->
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>
```

191
### 消息驱动通道适配器
dallascao's avatar
dallascao 已提交
192 193 194 195 196 197 198 199 200 201 202 203 204 205

`KafkaMessageDrivenChannelAdapter``<int-kafka:message-driven-channel-adapter>`)使用`spring-kafka``KafkaMessageListenerContainer``ConcurrentListenerContainer`

而且`mode`属性也是可用的。它可以接受`record``batch`的值(默认:`record`)。对于`record`模式,每个消息有效负载都是从单个`ConsumerRecord`转换而来的。对于`batch`模式,有效负载是由消费者投票返回的所有`ConsumerRecord`实例转换的对象列表。与批处理的`@KafkaListener`一样,`KafkaHeaders.RECEIVED_MESSAGE_KEY``KafkaHeaders.RECEIVED_PARTITION_ID``KafkaHeaders.RECEIVED_TOPIC``KafkaHeaders.OFFSET`标题也是列表,其位置与有效载荷中的位置相对应。

接收到的消息有特定的标题填充。有关更多信息,请参见[`KafkaHeaders`类](https://DOCS. Spring.io/ Spring-kafka/api/org/springframework/kafka/support/kafkaheaders.html)。

|   |`Consumer`对象(在`kafka_consumer`标头中)不是线程安全的。<br/>你必须仅在调用适配器中的侦听器的线程上调用它的方法。<br/>如果将消息传递给另一个线程,则不能调用它的方法。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

当提供了`retry-template`时,将根据其重试策略重试交付失败。在这种情况下,`error-channel`是不允许的。当重试用完时,可以使用`recovery-callback`来处理错误。在大多数情况下,这是一个`ErrorMessageSendingRecoverer`,它将`ErrorMessage`发送到一个通道。

在构建`ErrorMessage`(用于`error-channel``recovery-callback`)时,可以通过设置`error-message-strategy`属性来定制错误消息。默认情况下,将使用`RawRecordHeaderErrorMessageStrategy`,以提供对转换后的消息以及原始`ConsumerRecord`的访问。

206
#### Java 配置
dallascao's avatar
dallascao 已提交
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235

下面的示例展示了如何使用 Java 配置消息驱动的通道适配器:

```
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
            adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
    return kafkaMessageDrivenChannelAdapter;
}

@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
    ContainerProperties properties = new ContainerProperties(this.topic);
    // set more properties
    return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
    // set more properties
    return new DefaultKafkaConsumerFactory<>(props);
}
```

236
#### Java DSL 配置
dallascao's avatar
dallascao 已提交
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279

下面的示例展示了如何使用 Spring 集成 Java DSL 配置消息驱动通道适配器:

```
@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow() {
    return IntegrationFlows
            .from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
                    KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
                    .configureListenerContainer(c ->
                            c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
                                    .id("topic1ListenerContainer"))
                    .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
                            new RawRecordHeaderErrorMessageStrategy()))
                    .retryTemplate(new RetryTemplate())
                    .filterInRetry(true))
            .filter(Message.class, m ->
                            m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
                    f -> f.throwExceptionOnRejection(true))
            .<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("listeningFromKafkaResults1"))
            .get();
}
```

还可以使用用于`@KafkaListener`注释的容器工厂为其他目的创建`ConcurrentMessageListenerContainer`实例。有关示例,请参见[the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/)

对于 Java DSL,容器不必配置为`@Bean`,因为 DSL 将容器注册为 Bean。下面的示例展示了如何做到这一点:

```
@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
    return IntegrationFlows
            .from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
            KafkaMessageDrivenChannelAdapter.ListenerMode.record)
                .id("topic2Adapter"))
            ...
            get();
}
```

注意,在这种情况下,适配器被赋予`id``topic2Adapter`)。容器以`topic2Adapter.container`的名称注册在应用程序上下文中。如果适配器没有`id`属性,则容器的 Bean 名称是容器的完全限定类名称加上`#n`,其中`n`对每个容器递增。

280
#### XML 配置
dallascao's avatar
dallascao 已提交
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317

下面的示例展示了如何使用 XML 配置消息驱动的通道适配器:

```
<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        mode="record"
        retry-template="template"
        recovery-callback="callback"
        error-message-strategy="ems"
        channel="someChannel"
        error-channel="errorChannel" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg name="topics" value="foo" />
        </bean>
    </constructor-arg>

</bean>
```

318
### 入站通道适配器
dallascao's avatar
dallascao 已提交
319 320 321

`KafkaMessageSource`提供了一个可匹配的通道适配器实现。

322
#### Java 配置
dallascao's avatar
dallascao 已提交
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343

```
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
@Bean
public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf)  {
    KafkaMessageSource<String, String> source = new KafkaMessageSource<>(cf, "myTopic");
    source.setGroupId("myGroupId");
    source.setClientId("myClientId");
    return source;
}
```

请参考 Javadocs 以获得可用的属性。

默认情况下,`max.poll.records`必须在消费工厂中显式设置,或者如果消费工厂是`DefaultKafkaConsumerFactory`,则将强制设置为 1。可以将属性`allowMultiFetch`设置为`true`以覆盖此行为。

|   |你必须在`max.poll.interval.ms`内轮询消费者以避免重新平衡。<br/>如果你将`allowMultiFetch`设置为`true`,你必须处理所有检索到的记录,并在`max.poll.interval.ms`内再次轮询。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

此适配器发出的消息包含一个标题`kafka_remainingRecords`,其中包含上一个轮询中剩余的记录的计数。

344
#### Java DSL 配置
dallascao's avatar
dallascao 已提交
345 346 347 348 349 350 351 352 353 354 355

```
@Bean
public IntegrationFlow flow(ConsumerFactory<String, String> cf)  {
    return IntegrationFlows.from(Kafka.inboundChannelAdapter(cf, "myTopic")
                .groupId("myDslGroupId"), e -> e.poller(Pollers.fixedDelay(5000)))
            .handle(System.out::println)
            .get();
}
```

356
#### XML 配置
dallascao's avatar
dallascao 已提交
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375

```
<int-kafka:inbound-channel-adapter
        id="adapter1"
        consumer-factory="consumerFactory"
        ack-factory="ackFactory"
        topics="topic1"
        channel="inbound"
        client-id="client"
        group-id="group"
        message-converter="converter"
        payload-type="java.lang.String"
        raw-header="true"
        auto-startup="false"
        rebalance-listener="rebal">
    <int:poller fixed-delay="5000"/>
</int-kafka:inbound-channel-adapter>
```

376
### 出站网关
dallascao's avatar
dallascao 已提交
377 378 379 380 381 382 383 384

出站网关用于请求/回复操作。它与大多数 Spring 集成网关的不同之处在于,发送线程不会在网关中阻塞,而应答将在应答侦听器容器线程上进行处理。如果你的代码调用同步[消息传递网关](https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#gateway)后面的网关,则用户线程将在此阻塞,直到收到答复(或者发生超时)。

|   |网关不接受请求,直到应答容器分配了它的主题和分区。<br/>建议你在模板的应答容器属性中添加`ConsumerRebalanceListener`,并在向网关发送消息之前等待`onPartitionsAssigned`调用。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

`KafkaProducerMessageHandler``sendTimeoutExpression`默认值是`delivery.timeout.ms`Kafka Producer 属性`+ 5000`,这样超时后的实际 Kafka 错误将传播到应用程序,而不是由该框架生成的超时。为了保持一致性,对此进行了更改,因为你可能会遇到意外的行为( Spring 可能会超时发送,而实际上最终是成功的)。重要提示:默认情况下,超时时间为 120 秒,因此你可能希望减少超时时间,以获得更多的及时故障。

385
#### Java 配置
dallascao's avatar
dallascao 已提交
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409

下面的示例展示了如何使用 Java 配置网关:

```
@Bean
@ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies")
public KafkaProducerMessageHandler<String, String> outGateway(
        ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
    return new KafkaProducerMessageHandler<>(kafkaTemplate);
}
```

请参考 Javadocs 以获得可用的属性。

请注意,使用了与[出站通道适配器](#kafka-outbound)相同的类,唯一的区别是传递到构造函数的`KafkaTemplate``ReplyingKafkaTemplate`。有关更多信息,请参见[the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/)

出站主题、分区、键等都是以与出站适配器相同的方式确定的。答复题目确定如下:

1. 一个名为`KafkaHeaders.REPLY_TOPIC`的消息头(如果存在,它必须具有`String``byte[]`值)将根据模板的回复容器的订阅主题进行验证。

2. 如果模板的`replyContainer`仅订阅了一个主题,则使用该主题。

还可以指定`KafkaHeaders.REPLY_PARTITION`头,以确定用于答复的特定分区。同样,这是根据模板的回复容器的订阅进行验证的。

410
#### Java DSL 配置
dallascao's avatar
dallascao 已提交
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438

下面的示例展示了如何使用 Java DSL 配置出站网关:

```
@Bean
public IntegrationFlow outboundGateFlow(
        ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {

    return IntegrationFlows.from("kafkaRequests")
            .handle(Kafka.outboundGateway(kafkaTemplate))
            .channel("kafkaReplies")
            .get();
}
```

或者,也可以使用类似于以下 Bean 的配置:

```
@Bean
public IntegrationFlow outboundGateFlow() {
    return IntegrationFlows.from("kafkaRequests")
            .handle(Kafka.outboundGateway(producerFactory(), replyContainer())
                .configureKafkaTemplate(t -> t.replyTimeout(30_000)))
            .channel("kafkaReplies")
            .get();
}
```

439
#### XML 配置
dallascao's avatar
dallascao 已提交
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460

```
<int-kafka:outbound-gateway
    id="allProps"
    error-message-strategy="ems"
    kafka-template="template"
    message-key-expression="'key'"
    order="23"
    partition-id-expression="2"
    reply-channel="replies"
    reply-timeout="43"
    request-channel="requests"
    requires-reply="false"
    send-success-channel="successes"
    send-failure-channel="failures"
    send-timeout-expression="44"
    sync="true"
    timestamp-expression="T(System).currentTimeMillis()"
    topic-expression="'topic'"/>
```

461
### 入站网关
dallascao's avatar
dallascao 已提交
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514

入站网关用于请求/回复操作。

下面的示例展示了如何使用 Java 配置入站网关:

```
@Bean
public KafkaInboundGateway<Integer, String, String> inboundGateway(
        AbstractMessageListenerContainer<Integer, String>container,
        KafkaTemplate<Integer, String> replyTemplate) {

    KafkaInboundGateway<Integer, String, String> gateway =
        new KafkaInboundGateway<>(container, replyTemplate);
    gateway.setRequestChannel(requests);
    gateway.setReplyChannel(replies);
    gateway.setReplyTimeout(30_000);
    return gateway;
}
```

请参考 Javadocs 以获得可用的属性。

下面的示例展示了如何使用 Java DSL 配置一个简单的大写转换器:

```
@Bean
public IntegrationFlow serverGateway(
        ConcurrentMessageListenerContainer<Integer, String> container,
        KafkaTemplate<Integer, String> replyTemplate) {
    return IntegrationFlows
            .from(Kafka.inboundGateway(container, replyTemplate)
                .replyTimeout(30_000))
            .<String, String>transform(String::toUpperCase)
            .get();
}
```

或者,你可以通过使用类似于以下代码来配置大写转换器:

```
@Bean
public IntegrationFlow serverGateway() {
    return IntegrationFlows
            .from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
                    producerFactory())
                .replyTimeout(30_000))
            .<String, String>transform(String::toUpperCase)
            .get();
}
```

还可以使用用于`@KafkaListener`注释的容器工厂为其他目的创建`ConcurrentMessageListenerContainer`实例。有关示例,请参见[the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/)[消息驱动通道适配器](#kafka-inbound)

515
#### XML 配置
dallascao's avatar
dallascao 已提交
516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536

```
<int-kafka:inbound-gateway
        id="gateway1"
        listener-container="container1"
        kafka-template="template"
        auto-startup="false"
        phase="100"
        request-timeout="5000"
        request-channel="nullChannel"
        reply-channel="errorChannel"
        reply-timeout="43"
        message-converter="messageConverter"
        payload-type="java.lang.String"
        error-message-strategy="ems"
        retry-template="retryTemplate"
        recovery-callback="recoveryCallback"/>
```

有关每个属性的描述,请参见 XML 模式。

537
### 由 Apache Kafka 主题支持的通道
dallascao's avatar
dallascao 已提交
538 539 540 541 542

Spring 集成具有`MessageChannel`实现,该实现由用于持久性的 Apache Kafka 主题支持。

每个通道都需要一个`KafkaTemplate`用于发送端,或者一个侦听器容器工厂(用于可订阅通道),或者一个`KafkaMessageSource`用于可检索通道。

543
#### Java DSL 配置
dallascao's avatar
dallascao 已提交
544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591

```
@Bean
public IntegrationFlow flowWithSubscribable(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return IntegrationFlows.from(...)
            ...
            .channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1"))
            ...
            .get();
}

@Bean
public IntegrationFlow flowWithPubSub(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return IntegrationFlows.from(...)
            ...
            .publishSubscribeChannel(pubSub(template, containerFactory),
                pubsub -> pubsub
                            .subscribe(subflow -> ...)
                            .subscribe(subflow -> ...))
            .get();
}

@Bean
public BroadcastCapableChannel pubSub(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2")
            .groupId("group2")
            .get();
}

@Bean
public IntegrationFlow flowWithPollable(KafkaTemplate<Integer, String> template,
        KafkaMessageSource<Integer, String> source) {

    return IntegrationFlows.from(...)
            ...
            .channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3"))
            .handle(...,  e -> e.poller(...))
            ...
            .get();
}
```

592
#### Java 配置
dallascao's avatar
dallascao 已提交
593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634

```
/**
 * Channel for a single subscriber.
 **/
@Bean
SubscribableKafkaChannel pointToPoint(KafkaTemplate<String, String> template,
    KafkaListenerContainerFactory<String, String> factory)

    SubscribableKafkaChannel channel =
        new SubscribableKafkaChannel(template, factory, "topicA");
    channel.setGroupId("group1");
    return channel;
}

/**
 * Channel for multiple subscribers.
 **/
@Bean
SubscribableKafkaChannel pubsub(KafkaTemplate<String, String> template,
    KafkaListenerContainerFactory<String, String> factory)

    SubscribableKafkaChannel channel =
        new SubscribableKafkaChannel(template, factory, "topicB", true);
    channel.setGroupId("group2");
    return channel;
}

/**
 * Pollable channel (topic is configured on the source)
 **/
@Bean
PollableKafkaChannel pollable(KafkaTemplate<String, String> template,
    KafkaMessageSource<String, String> source)

    PollableKafkaChannel channel =
        new PollableKafkaChannel(template, source);
    channel.setGroupId("group3");
    return channel;
}
```

635
#### XML 配置
dallascao's avatar
dallascao 已提交
636 637 638 639 640 641 642 643 644 645 646 647

```
<int-kafka:channel kafka-template="template" id="ptp" topic="ptpTopic" group-id="ptpGroup"
    container-factory="containerFactory" />

<int-kafka:pollable-channel kafka-template="template" id="pollable" message-source="source"
    group-id = "pollableGroup"/>

<int-kafka:publish-subscribe-channel kafka-template="template" id="pubSub" topic="pubSubTopic"
    group-id="pubSubGroup" container-factory="containerFactory" />
```

648
### 消息转换
dallascao's avatar
dallascao 已提交
649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684

a`StringJsonMessageConverter`。有关更多信息,请参见[the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/)

当将此转换器与消息驱动通道适配器一起使用时,你可以指定要将传入的有效负载转换为哪种类型。这是通过在适配器上设置`payload-type`属性(`payloadType`属性)来实现的。下面的示例展示了如何在 XML 配置中实现这一点:

```
<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        channel="nullChannel"
        message-converter="messageConverter"
        payload-type="com.example.Foo"
        error-channel="errorChannel" />

<bean id="messageConverter"
    class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>
```

下面的示例展示了如何在 Java 配置中设置适配器上的`payload-type`属性(`payloadType`属性):

```
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
            adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
    kafkaMessageDrivenChannelAdapter.setMessageConverter(converter());
    kafkaMessageDrivenChannelAdapter.setPayloadType(Foo.class);
    return kafkaMessageDrivenChannelAdapter;
}
```

685
### 空有效载荷和日志压缩“墓碑”记录
dallascao's avatar
dallascao 已提交
686 687 688 689 690 691 692 693 694 695 696 697 698 699

Spring 消息传递`Message<?>`对象不能具有`null`有效负载。当你使用 Apache Kafka 的端点时,`null`有效载荷(也称为 Tombstone 记录)由类型为`KafkaNull`的有效载荷表示。有关更多信息,请参见[the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/)

Spring 集成端点的 POJO 方法可以使用真正的`null`值,而不是`KafkaNull`值。要这样做,用`@Payload(required = false)`标记参数。下面的示例展示了如何做到这一点:

```
@ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint")
public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
               @Payload(required = false) Customer customer) {
    // customer is null if a tombstone record
    ...
}
```

700
### 从`KStream`调用 Spring 集成流
dallascao's avatar
dallascao 已提交
701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738

你可以使用`MessagingTransformer``KStream`调用集成流:

```
@Bean
public KStream<byte[], byte[]> kStream(StreamsBuilder kStreamBuilder,
        MessagingTransformer<byte[], byte[], byte[]> transformer)  transformer) {
    KStream<byte[], byte[]> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    stream.mapValues((ValueMapper<byte[], byte[]>) String::toUpperCase)
            ...
            .transform(() -> transformer)
            .to(streamingTopic2);

    stream.print(Printed.toSysOut());

    return stream;
}

@Bean
@DependsOn("flow")
public MessagingTransformer<byte[], byte[], String> transformer(
        MessagingFunction function) {

    MessagingMessageConverter converter = new MessagingMessageConverter();
    converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
    return new MessagingTransformer<>(function, converter);
}

@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from(MessagingFunction.class)
        ...
        .get();
}
```

当一个集成流从一个接口开始时,所创建的代理具有该流的名称 Bean,并附加了“.gateway”,因此如果需要,这个 Bean 名称可以使用 a`@Qualifier`

739
### 读/处理/写场景的性能注意事项
dallascao's avatar
dallascao 已提交
740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803

许多应用程序使用一个主题,执行一些处理并写入另一个主题。在大多数情况下,如果写入失败,应用程序将希望抛出一个异常,以便可以重试传入的请求并/或将其发送到一个死信主题。该功能由底层消息侦听器容器以及适当配置的错误处理程序支持。然而,为了支持这一点,我们需要阻塞侦听器线程,直到写操作成功(或失败),以便可以将任何异常抛到容器中。当使用单个记录时,可以通过在出站适配器上设置`sync`属性来实现。然而,当使用批处理时,使用`sync`会导致显著的性能下降,因为应用程序将在发送下一条消息之前等待每次发送的结果。你还可以执行多个发送,然后等待这些发送的结果。这是通过向消息处理程序添加`futuresChannel`来实现的。要启用该功能,请将`KafkaIntegrationHeaders.FUTURE_TOKEN`添加到出站消息中;然后可以使用此功能将`Future`关联到特定的已发送消息。下面是一个如何使用此功能的示例:

```
@SpringBootApplication
public class FuturesChannelApplication {

    public static void main(String[] args) {
        SpringApplication.run(FuturesChannelApplication.class, args);
    }

    @Bean
    IntegrationFlow inbound(ConsumerFactory<String, String> consumerFactory, Handler handler) {
        return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory,
                    ListenerMode.batch, "inTopic"))
                .handle(handler)
                .get();
    }

    @Bean
    IntegrationFlow outbound(KafkaTemplate<String, String> kafkaTemplate) {
        return IntegrationFlows.from(Gate.class)
                .enrichHeaders(h -> h
                        .header(KafkaHeaders.TOPIC, "outTopic")
                        .headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]"))
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                        .futuresChannel("futures"))
                .get();
    }

    @Bean
    PollableChannel futures() {
        return new QueueChannel();
    }

}

@Component
@DependsOn("outbound")
class Handler {

    @Autowired
    Gate gate;

    @Autowired
    PollableChannel futures;

    public void handle(List<String> input) throws Exception {
        System.out.println(input);
        input.forEach(str -> this.gate.send(str.toUpperCase()));
        for (int i = 0; i < input.size(); i++) {
            Message<?> future = this.futures.receive(10000);
            ((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS);
        }
    }

}

interface Gate {

    void send(String out);

}
```