kafka.md 36.3 KB
Newer Older
茶陵後's avatar
茶陵後 已提交
1 2
# Apache Kafka Support 

3
## Apache Kafka Support
茶陵後's avatar
茶陵後 已提交
4

5
### Overview
茶陵後's avatar
茶陵後 已提交
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40

Spring Integration for Apache Kafka is based on the [Spring for Apache Kafka project](https://projects.spring.io/spring-kafka/).

You need to include this dependency into your project:

Maven

```
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>5.5.9</version>
</dependency>
```

Gradle

```
compile "org.springframework.integration:spring-integration-kafka:5.5.9"
```

It provides the following components:

* [Outbound Channel Adapter](#kafka-outbound)

* [Message-driven Channel Adapter](#kafka-inbound)

* [Inbound Channel Adapter](#kafka-inbound-pollable)

* [Outbound Gateway](#kafka-outbound-gateway)

* [Inbound Gateway](#kafka-inbound-gateway)

* [Channels Backed by Apache Kafka Topics](#kafka-channels)

41
### Outbound Channel Adapter
茶陵後's avatar
茶陵後 已提交
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93

The Outbound channel adapter is used to publish messages from a Spring Integration channel to Apache Kafka topics.
The channel is defined in the application context and then wired into the application that sends messages to Apache Kafka.
Sender applications can publish to Apache Kafka by using Spring Integration messages, which are internally converted to Kafka records by the outbound channel adapter, as follows:

* The payload of the Spring Integration message is used to populate the payload of the Kafka record.

* By default, the `kafka_messageKey` header of the Spring Integration message is used to populate the key of the Kafka record.

You can customize the target topic and partition for publishing the message through the `kafka_topic` and `kafka_partitionId` headers, respectively.

In addition, the `<int-kafka:outbound-channel-adapter>` provides the ability to extract the key, target topic, and target partition by applying SpEL expressions on the outbound message.
To that end, it supports three mutually exclusive pairs of attributes:

* `topic` and `topic-expression`

* `message-key` and `message-key-expression`

* `partition-id` and `partition-id-expression`

These let you specify `topic`, `message-key`, and `partition-id`, respectively, as static values on the adapter or to dynamically evaluate their values at runtime against the request message.

|   |The `KafkaHeaders` interface (provided by `spring-kafka`) contains constants used for interacting with<br/>headers.<br/>The `messageKey` and `topic` default headers now require a `kafka_` prefix.<br/>When migrating from an earlier version that used the old headers, you need to specify `message-key-expression="headers['messageKey']"` and `topic-expression="headers['topic']"` on the `<int-kafka:outbound-channel-adapter>`.<br/>Alternatively, you can change the headers upstream to the new headers from `KafkaHeaders` by using a `<header-enricher>` or a `MessageBuilder`.<br/>If you use constant values, you can also configure them on the adapter by using `topic` and `message-key`.|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

NOTE : If the adapter is configured with a topic or message key (either with a constant or expression), those are used and the corresponding header is ignored.
If you wish the header to override the configuration, you need to configure it in an expression, such as the following:

```
topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"
```

The adapter requires a `KafkaTemplate`, which, in turn, requires a suitably configured `KafkaProducerFactory`.

If a `send-failure-channel` (`sendFailureChannel`) is provided and a send failure (sync or async) is received, an `ErrorMessage` is sent to the channel.
The payload is a `KafkaSendFailureException` with `failedMessage`, `record` (the `ProducerRecord`) and `cause` properties.
You can override the `DefaultErrorMessageStrategy` by setting the `error-message-strategy` property.

If a `send-success-channel` (`sendSuccessChannel`) is provided, a message with a payload of type `org.apache.kafka.clients.producer.RecordMetadata` is sent after a successful send.

|   |If your application uses transactions and the same channel adapter is used to publish messages where the transaction is started by a listener container, as well as publishing where there is no existing transaction, you must configure a `transactionIdPrefix` on the `KafkaTemplate` to override the prefix used by the container or transaction manager.<br/>The prefix used by container-initiated transactions (the producer factory or transaction manager property) must be the same on all application instances.<br/>The prefix used for producer-only transactions must be unique on all application instances.|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

You can configure a `flushExpression` which must resolve to a boolean value.
Flushing after sending several messages might be useful if you are using the `linger.ms` and `batch.size` Kafka producer properties; the expression should evaluate to `Boolean.TRUE` on the last message and an incomplete batch will be sent immediately.
By default, the expression looks for a `Boolean` value in the `KafkaIntegrationHeaders.FLUSH` header (`kafka_flush`).
The flush will occur if the value is `true` and not if it’s `false` or the header is absent.

The `KafkaProducerMessageHandler.sendTimeoutExpression` default has changed from 10 seconds to the `delivery.timeout.ms` Kafka producer property `+ 5000` so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework.
This has been changed for consistency because you may get unexpected behavior (Spring may timeout the send, while it is actually, eventually, successful).
IMPORTANT: That timeout is 120 seconds by default so you may wish to reduce it to get more timely failures.

94
#### Java Configuration
茶陵後's avatar
茶陵後 已提交
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124

The following example shows how to configure the outbound channel adapter for Apache Kafka with Java:

```
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
    KafkaProducerMessageHandler<String, String> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setTopicExpression(new LiteralExpression("someTopic"));
    handler.setMessageKeyExpression(new LiteralExpression("someKey"));
    handler.setSuccessChannel(successes());
    handler.setFailureChannel(failures());
    return handler;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
    // set more properties
    return new DefaultKafkaProducerFactory<>(props);
}
```

125
#### Java DSL Configuration
茶陵後's avatar
茶陵後 已提交
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169

The following example shows how to configure the outbound channel adapter for Apache Kafka with Spring Integration Java DSL:

```
@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}

@Bean
public IntegrationFlow sendToKafkaFlow() {
    return f -> f
            .<String>split(p -> Stream.generate(() -> p).limit(101).iterator(), null)
            .publishSubscribeChannel(c -> c
                    .subscribe(sf -> sf.handle(
                            kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
                                    .timestampExpression("T(Long).valueOf('1487694048633')"),
                            e -> e.id("kafkaProducer1")))
                    .subscribe(sf -> sf.handle(
                            kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
                                   .timestamp(m -> 1487694048644L),
                            e -> e.id("kafkaProducer2")))
            );
}

@Bean
public DefaultKafkaHeaderMapper mapper() {
    return new DefaultKafkaHeaderMapper();
}

private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
        ProducerFactory<Integer, String> producerFactory, String topic) {
    return Kafka
            .outboundChannelAdapter(producerFactory)
            .messageKey(m -> m
                    .getHeaders()
                    .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
            .headerMapper(mapper())
            .partitionId(m -> 10)
            .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
            .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
```

170
#### XML Configuration
茶陵後's avatar
茶陵後 已提交
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201

The following example shows how to configure the Kafka outbound channel adapter with XML:

```
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="template"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    topic="foo"
                                    sync="false"
                                    message-key-expression="'bar'"
                                    send-failure-channel="failures"
                                    send-success-channel="successes"
                                    error-message-strategy="ems"
                                    partition-id-expression="2">
</int-kafka:outbound-channel-adapter>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    ... <!-- more producer properties -->
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>
```

202
### Message-driven Channel Adapter
茶陵後's avatar
茶陵後 已提交
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225

The `KafkaMessageDrivenChannelAdapter` (`<int-kafka:message-driven-channel-adapter>`) uses a `spring-kafka` `KafkaMessageListenerContainer` or `ConcurrentListenerContainer`.

Also the `mode` attribute is available.
It can accept values of `record` or `batch` (default: `record`).
For `record` mode, each message payload is converted from a single `ConsumerRecord`.
For `batch` mode, the payload is a list of objects that are converted from all the `ConsumerRecord` instances returned by the consumer poll.
As with the batched `@KafkaListener`, the `KafkaHeaders.RECEIVED_MESSAGE_KEY`, `KafkaHeaders.RECEIVED_PARTITION_ID`, `KafkaHeaders.RECEIVED_TOPIC`, and `KafkaHeaders.OFFSET` headers are also lists, with positions corresponding to the position in the payload.

Received messages have certain headers populated.
See the [`KafkaHeaders` class](https://docs.spring.io/spring-kafka/api/org/springframework/kafka/support/KafkaHeaders.html) for more information.

|   |The `Consumer` object (in the `kafka_consumer` header) is not thread-safe.<br/>You must invoke its methods only on the thread that calls the listener within the adapter.<br/>If you hand off the message to another thread, you must not call its methods.|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

When a `retry-template` is provided, delivery failures are retried according to its retry policy.
An `error-channel` is not allowed in this case.
You can use the `recovery-callback` to handle the error when retries are exhausted.
In most cases, this is an `ErrorMessageSendingRecoverer` that sends the `ErrorMessage` to a channel.

When building an `ErrorMessage` (for use in the `error-channel` or `recovery-callback`), you can customize the error message by setting the `error-message-strategy` property.
By default, a `RawRecordHeaderErrorMessageStrategy` is used, to provide access to the converted message as well as the raw `ConsumerRecord`.

226
#### Java Configuration
茶陵後's avatar
茶陵後 已提交
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255

The following example shows how to configure a message-driven channel adapter with Java:

```
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
            adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
    return kafkaMessageDrivenChannelAdapter;
}

@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
    ContainerProperties properties = new ContainerProperties(this.topic);
    // set more properties
    return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
    // set more properties
    return new DefaultKafkaConsumerFactory<>(props);
}
```

256
#### Java DSL Configuration
茶陵後's avatar
茶陵後 已提交
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303

The following example shows how to configure a message-driven channel adapter with the Spring Integration Java DSL:

```
@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow() {
    return IntegrationFlows
            .from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
                    KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
                    .configureListenerContainer(c ->
                            c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
                                    .id("topic1ListenerContainer"))
                    .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
                            new RawRecordHeaderErrorMessageStrategy()))
                    .retryTemplate(new RetryTemplate())
                    .filterInRetry(true))
            .filter(Message.class, m ->
                            m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
                    f -> f.throwExceptionOnRejection(true))
            .<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("listeningFromKafkaResults1"))
            .get();
}
```

You can also use the container factory that is used for `@KafkaListener` annotations to create `ConcurrentMessageListenerContainer` instances for other purposes.
See [the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/) for an example.

With the Java DSL, the container does not have to be configured as a `@Bean`, because the DSL registers the container as a bean.
The following example shows how to do so:

```
@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
    return IntegrationFlows
            .from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
            KafkaMessageDrivenChannelAdapter.ListenerMode.record)
                .id("topic2Adapter"))
            ...
            get();
}
```

Notice that, in this case, the adapter is given an `id` (`topic2Adapter`).
The container is registered in the application context with a name of `topic2Adapter.container`.
If the adapter does not have an `id` property, the container’s bean name is the container’s fully qualified class name plus `#n`, where `n` is incremented for each container.

304
#### XML Configuration
茶陵後's avatar
茶陵後 已提交
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341

The following example shows how to configure a message-driven channel adapter with XML:

```
<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        mode="record"
        retry-template="template"
        recovery-callback="callback"
        error-message-strategy="ems"
        channel="someChannel"
        error-channel="errorChannel" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg name="topics" value="foo" />
        </bean>
    </constructor-arg>

</bean>
```

342
### Inbound Channel Adapter
茶陵後's avatar
茶陵後 已提交
343 344 345

The `KafkaMessageSource` provides a pollable channel adapter implementation.

346
#### Java Configuration
茶陵後's avatar
茶陵後 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368

```
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
@Bean
public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf)  {
    KafkaMessageSource<String, String> source = new KafkaMessageSource<>(cf, "myTopic");
    source.setGroupId("myGroupId");
    source.setClientId("myClientId");
    return source;
}
```

Refer to the javadocs for available properties.

By default, `max.poll.records` must be either explicitly set in the consumer factory, or it will be forced to 1 if the consumer factory is a `DefaultKafkaConsumerFactory`.
You can set the property `allowMultiFetch` to `true` to override this behavior.

|   |You must poll the consumer within `max.poll.interval.ms` to avoid a rebalance.<br/>If you set `allowMultiFetch` to `true` you must process all the retrieved records, and poll again, within `max.poll.interval.ms`.|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

Messages emitted by this adapter contain a header `kafka_remainingRecords` with a count of records remaining from the previous poll.

369
#### Java DSL Configuration
茶陵後's avatar
茶陵後 已提交
370 371 372 373 374 375 376 377 378 379 380

```
@Bean
public IntegrationFlow flow(ConsumerFactory<String, String> cf)  {
    return IntegrationFlows.from(Kafka.inboundChannelAdapter(cf, "myTopic")
                .groupId("myDslGroupId"), e -> e.poller(Pollers.fixedDelay(5000)))
            .handle(System.out::println)
            .get();
}
```

381
#### XML Configuration
茶陵後's avatar
茶陵後 已提交
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400

```
<int-kafka:inbound-channel-adapter
        id="adapter1"
        consumer-factory="consumerFactory"
        ack-factory="ackFactory"
        topics="topic1"
        channel="inbound"
        client-id="client"
        group-id="group"
        message-converter="converter"
        payload-type="java.lang.String"
        raw-header="true"
        auto-startup="false"
        rebalance-listener="rebal">
    <int:poller fixed-delay="5000"/>
</int-kafka:inbound-channel-adapter>
```

401
### Outbound Gateway
茶陵後's avatar
茶陵後 已提交
402 403 404 405 406 407 408 409 410 411 412 413

The outbound gateway is for request/reply operations.
It differs from most Spring Integration gateways in that the sending thread does not block in the gateway and the reply is processed on the reply listener container thread.
If your code invokes the gateway behind a synchronous [Messaging Gateway](https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#gateway), the user thread blocks there until the reply is received (or a timeout occurs).

|   |The gateway does not accept requests until the reply container has been assigned its topics and partitions.<br/>It is suggested that you add a `ConsumerRebalanceListener` to the template’s reply container properties and wait for the `onPartitionsAssigned` call before sending messages to the gateway.|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

The `KafkaProducerMessageHandler` `sendTimeoutExpression` default is `delivery.timeout.ms` Kafka producer property `+ 5000` so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework.
This has been changed for consistency because you may get unexpected behavior (Spring may timeout the send, while it is actually, eventually, successful).
IMPORTANT: That timeout is 120 seconds by default so you may wish to reduce it to get more timely failures.

414
#### Java Configuration
茶陵後's avatar
茶陵後 已提交
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441

The following example shows how to configure a gateway with Java:

```
@Bean
@ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies")
public KafkaProducerMessageHandler<String, String> outGateway(
        ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
    return new KafkaProducerMessageHandler<>(kafkaTemplate);
}
```

Refer to the javadocs for available properties.

Notice that the same class as the [outbound channel adapter](#kafka-outbound) is used, the only difference being that the `KafkaTemplate` passed into the constructor is a `ReplyingKafkaTemplate`.
See [the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/) for more information.

The outbound topic, partition, key, and so on are determined in the same way as the outbound adapter.
The reply topic is determined as follows:

1. A message header named `KafkaHeaders.REPLY_TOPIC` (if present, it must have a `String` or `byte[]` value) is validated against the template’s reply container’s subscribed topics.

2. If the template’s `replyContainer` is subscribed to only one topic, it is used.

You can also specify a `KafkaHeaders.REPLY_PARTITION` header to determine a specific partition to be used for replies.
Again, this is validated against the template’s reply container’s subscriptions.

442
#### Java DSL Configuration
茶陵後's avatar
茶陵後 已提交
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470

The following example shows how to configure an outbound gateway with the Java DSL:

```
@Bean
public IntegrationFlow outboundGateFlow(
        ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {

    return IntegrationFlows.from("kafkaRequests")
            .handle(Kafka.outboundGateway(kafkaTemplate))
            .channel("kafkaReplies")
            .get();
}
```

Alternatively, you can also use a configuration similar to the following bean:

```
@Bean
public IntegrationFlow outboundGateFlow() {
    return IntegrationFlows.from("kafkaRequests")
            .handle(Kafka.outboundGateway(producerFactory(), replyContainer())
                .configureKafkaTemplate(t -> t.replyTimeout(30_000)))
            .channel("kafkaReplies")
            .get();
}
```

471
#### XML Configuration
茶陵後's avatar
茶陵後 已提交
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492

```
<int-kafka:outbound-gateway
    id="allProps"
    error-message-strategy="ems"
    kafka-template="template"
    message-key-expression="'key'"
    order="23"
    partition-id-expression="2"
    reply-channel="replies"
    reply-timeout="43"
    request-channel="requests"
    requires-reply="false"
    send-success-channel="successes"
    send-failure-channel="failures"
    send-timeout-expression="44"
    sync="true"
    timestamp-expression="T(System).currentTimeMillis()"
    topic-expression="'topic'"/>
```

493
### Inbound Gateway
茶陵後's avatar
茶陵後 已提交
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547

The inbound gateway is for request/reply operations.

The following example shows how to configure an inbound gateway with Java:

```
@Bean
public KafkaInboundGateway<Integer, String, String> inboundGateway(
        AbstractMessageListenerContainer<Integer, String>container,
        KafkaTemplate<Integer, String> replyTemplate) {

    KafkaInboundGateway<Integer, String, String> gateway =
        new KafkaInboundGateway<>(container, replyTemplate);
    gateway.setRequestChannel(requests);
    gateway.setReplyChannel(replies);
    gateway.setReplyTimeout(30_000);
    return gateway;
}
```

Refer to the javadocs for available properties.

The following example shows how to configure a simple upper case converter with the Java DSL:

```
@Bean
public IntegrationFlow serverGateway(
        ConcurrentMessageListenerContainer<Integer, String> container,
        KafkaTemplate<Integer, String> replyTemplate) {
    return IntegrationFlows
            .from(Kafka.inboundGateway(container, replyTemplate)
                .replyTimeout(30_000))
            .<String, String>transform(String::toUpperCase)
            .get();
}
```

Alternatively, you could configure an upper-case converter by using code similar to the following:

```
@Bean
public IntegrationFlow serverGateway() {
    return IntegrationFlows
            .from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
                    producerFactory())
                .replyTimeout(30_000))
            .<String, String>transform(String::toUpperCase)
            .get();
}
```

You can also use the container factory that is used for `@KafkaListener` annotations to create `ConcurrentMessageListenerContainer` instances for other purposes.
See [the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/) and [Message-driven Channel Adapter](#kafka-inbound) for examples.

548
#### XML Configuration
茶陵後's avatar
茶陵後 已提交
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569

```
<int-kafka:inbound-gateway
        id="gateway1"
        listener-container="container1"
        kafka-template="template"
        auto-startup="false"
        phase="100"
        request-timeout="5000"
        request-channel="nullChannel"
        reply-channel="errorChannel"
        reply-timeout="43"
        message-converter="messageConverter"
        payload-type="java.lang.String"
        error-message-strategy="ems"
        retry-template="retryTemplate"
        recovery-callback="recoveryCallback"/>
```

See the XML schema for a description of each property.

570
### Channels Backed by Apache Kafka Topics
茶陵後's avatar
茶陵後 已提交
571 572 573 574 575

Spring Integration has `MessageChannel` implementations backed by an Apache Kafka topic for persistence.

Each channel requires a `KafkaTemplate` for the sending side and either a listener container factory (for subscribable channels) or a `KafkaMessageSource` for a pollable channel.

576
#### Java DSL Configuration
茶陵後's avatar
茶陵後 已提交
577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624

```
@Bean
public IntegrationFlow flowWithSubscribable(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return IntegrationFlows.from(...)
            ...
            .channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1"))
            ...
            .get();
}

@Bean
public IntegrationFlow flowWithPubSub(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return IntegrationFlows.from(...)
            ...
            .publishSubscribeChannel(pubSub(template, containerFactory),
                pubsub -> pubsub
                            .subscribe(subflow -> ...)
                            .subscribe(subflow -> ...))
            .get();
}

@Bean
public BroadcastCapableChannel pubSub(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2")
            .groupId("group2")
            .get();
}

@Bean
public IntegrationFlow flowWithPollable(KafkaTemplate<Integer, String> template,
        KafkaMessageSource<Integer, String> source) {

    return IntegrationFlows.from(...)
            ...
            .channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3"))
            .handle(...,  e -> e.poller(...))
            ...
            .get();
}
```

625
#### Java Configuration
茶陵後's avatar
茶陵後 已提交
626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667

```
/**
 * Channel for a single subscriber.
 **/
@Bean
SubscribableKafkaChannel pointToPoint(KafkaTemplate<String, String> template,
    KafkaListenerContainerFactory<String, String> factory)

    SubscribableKafkaChannel channel =
        new SubscribableKafkaChannel(template, factory, "topicA");
    channel.setGroupId("group1");
    return channel;
}

/**
 * Channel for multiple subscribers.
 **/
@Bean
SubscribableKafkaChannel pubsub(KafkaTemplate<String, String> template,
    KafkaListenerContainerFactory<String, String> factory)

    SubscribableKafkaChannel channel =
        new SubscribableKafkaChannel(template, factory, "topicB", true);
    channel.setGroupId("group2");
    return channel;
}

/**
 * Pollable channel (topic is configured on the source)
 **/
@Bean
PollableKafkaChannel pollable(KafkaTemplate<String, String> template,
    KafkaMessageSource<String, String> source)

    PollableKafkaChannel channel =
        new PollableKafkaChannel(template, source);
    channel.setGroupId("group3");
    return channel;
}
```

668
#### XML Configuration
茶陵後's avatar
茶陵後 已提交
669 670 671 672 673 674 675 676 677 678 679 680

```
<int-kafka:channel kafka-template="template" id="ptp" topic="ptpTopic" group-id="ptpGroup"
    container-factory="containerFactory" />

<int-kafka:pollable-channel kafka-template="template" id="pollable" message-source="source"
    group-id = "pollableGroup"/>

<int-kafka:publish-subscribe-channel kafka-template="template" id="pubSub" topic="pubSubTopic"
    group-id="pubSubGroup" container-factory="containerFactory" />
```

681
### Message Conversion
茶陵後's avatar
茶陵後 已提交
682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720

A `StringJsonMessageConverter` is provided.
See [the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/) for more information.

When using this converter with a message-driven channel adapter, you can specify the type to which you want the incoming payload to be converted.
This is achieved by setting the `payload-type` attribute (`payloadType` property) on the adapter.
The following example shows how to do so in XML configuration:

```
<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        channel="nullChannel"
        message-converter="messageConverter"
        payload-type="com.example.Foo"
        error-channel="errorChannel" />

<bean id="messageConverter"
    class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>
```

The following example shows how to set the `payload-type` attribute (`payloadType` property) on the adapter in Java configuration:

```
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
            adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
    kafkaMessageDrivenChannelAdapter.setMessageConverter(converter());
    kafkaMessageDrivenChannelAdapter.setPayloadType(Foo.class);
    return kafkaMessageDrivenChannelAdapter;
}
```

721
### Null Payloads and Log Compaction 'Tombstone' Records
茶陵後's avatar
茶陵後 已提交
722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739

Spring Messaging `Message<?>` objects cannot have `null` payloads.
When you use the endpoints for Apache Kafka, `null` payloads (also known as tombstone records) are represented by a payload of type `KafkaNull`.
See See [the Spring for Apache Kafka documentation](https://docs.spring.io/spring-kafka/docs/current/reference/html/) for more information.

The POJO methods for Spring Integration endpoints can use a true `null` value instead instead of `KafkaNull`.
To do so, mark the parameter with `@Payload(required = false)`.
The following example shows how to do so:

```
@ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint")
public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
               @Payload(required = false) Customer customer) {
    // customer is null if a tombstone record
    ...
}
```

740
### Calling a Spring Integration flow from a `KStream`
茶陵後's avatar
茶陵後 已提交
741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778

You can use a `MessagingTransformer` to invoke an integration flow from a `KStream`:

```
@Bean
public KStream<byte[], byte[]> kStream(StreamsBuilder kStreamBuilder,
        MessagingTransformer<byte[], byte[], byte[]> transformer)  transformer) {
    KStream<byte[], byte[]> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    stream.mapValues((ValueMapper<byte[], byte[]>) String::toUpperCase)
            ...
            .transform(() -> transformer)
            .to(streamingTopic2);

    stream.print(Printed.toSysOut());

    return stream;
}

@Bean
@DependsOn("flow")
public MessagingTransformer<byte[], byte[], String> transformer(
        MessagingFunction function) {

    MessagingMessageConverter converter = new MessagingMessageConverter();
    converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
    return new MessagingTransformer<>(function, converter);
}

@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from(MessagingFunction.class)
        ...
        .get();
}
```

When an integration flow starts with an interface, the proxy that is created has the name of the flow bean, appended with ".gateway" so this bean name can be used a a `@Qualifier` if needed.

779
### Performance Considerations for read/process/write Scenarios
茶陵後's avatar
茶陵後 已提交
780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852

Many applications consume from a topic, perform some processing and write to another topic.
In most, cases, if the write fails, the application would want to throw an exception so the incoming request can be retried and/or sent to a dead letter topic.
This functionality is supported by the underlying message listener container, together with a suitably configured error handler.
However, in order to support this, we need to block the listener thread until the success (or failure) of the write operation so that any exceptions can be thrown to the container.
When consuming single records, this is achieved by setting the `sync` property on the outbound adapter.
However, when consuming batches, using `sync` causes a significant performance degradation because the application would wait for the result of each send before sending the next message.
You also can perform multiple sends and then wait for the results of those sends afterwards.
This is achieved by adding a `futuresChannel` to the message handler.
To enable the feature add `KafkaIntegrationHeaders.FUTURE_TOKEN` to the outbound messages; this can then be used to correlate a `Future` to a particular sent message.
Here is an example of how you might use this feature:

```
@SpringBootApplication
public class FuturesChannelApplication {

    public static void main(String[] args) {
        SpringApplication.run(FuturesChannelApplication.class, args);
    }

    @Bean
    IntegrationFlow inbound(ConsumerFactory<String, String> consumerFactory, Handler handler) {
        return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory,
                    ListenerMode.batch, "inTopic"))
                .handle(handler)
                .get();
    }

    @Bean
    IntegrationFlow outbound(KafkaTemplate<String, String> kafkaTemplate) {
        return IntegrationFlows.from(Gate.class)
                .enrichHeaders(h -> h
                        .header(KafkaHeaders.TOPIC, "outTopic")
                        .headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]"))
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                        .futuresChannel("futures"))
                .get();
    }

    @Bean
    PollableChannel futures() {
        return new QueueChannel();
    }

}

@Component
@DependsOn("outbound")
class Handler {

    @Autowired
    Gate gate;

    @Autowired
    PollableChannel futures;

    public void handle(List<String> input) throws Exception {
        System.out.println(input);
        input.forEach(str -> this.gate.send(str.toUpperCase()));
        for (int i = 0; i < input.size(); i++) {
            Message<?> future = this.futures.receive(10000);
            ((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS);
        }
    }

}

interface Gate {

    void send(String out);

}
```