amqp.md 154.6 KB
Newer Older
茶陵後's avatar
茶陵後 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079
# AMQP Support

## AMQP Support

Spring Integration provides channel adapters for receiving and sending messages by using the Advanced Message Queuing Protocol (AMQP).

You need to include this dependency into your project:

Maven

```
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-amqp</artifactId>
    <version>5.5.9</version>
</dependency>
```

Gradle

```
compile "org.springframework.integration:spring-integration-amqp:5.5.9"
```

The following adapters are available:

* [Inbound Channel Adapter](#amqp-inbound-channel-adapter)

* [Inbound Gateway](#amqp-inbound-gateway)

* [Outbound Channel Adapter](#amqp-outbound-channel-adapter)

* [Outbound Gateway](#amqp-outbound-gateway)

* [Async Outbound Gateway](#amqp-async-outbound-gateway)

Spring Integration also provides a point-to-point message channel and a publish-subscribe message channel backed by AMQP Exchanges and Queues.

To provide AMQP support, Spring Integration relies on ([Spring AMQP](https://projects.spring.io/spring-amqp)), which applies core Spring concepts to the development of AMQP-based messaging solutions.
Spring AMQP provides similar semantics to ([Spring JMS](https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#jms)).

Whereas the provided AMQP Channel Adapters are intended for unidirectional messaging (send or receive) only, Spring Integration also provides inbound and outbound AMQP gateways for request-reply operations.

TIP:
You should familiarize yourself with the [reference documentation of the Spring AMQP project](https://docs.spring.io/spring-amqp/reference/html/).
It provides much more in-depth information about Spring’s integration with AMQP in general and RabbitMQ in particular.

### Inbound Channel Adapter

The following listing shows the possible configuration options for an AMQP Inbound Channel Adapter:

Java DSL

```
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "aName"))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
```

Java

```
@Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(channel);
    return adapter;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
                               new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("aName");
    container.setConcurrentConsumers(2);
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
        }

    };
}
```

XML

```
<int-amqp:inbound-channel-adapter
                                  id="inboundAmqp"                (1)
                                  channel="inboundChannel"        (2)
                                  queue-names="si.test.queue"     (3)
                                  acknowledge-mode="AUTO"         (4)
                                  advice-chain=""                 (5)
                                  channel-transacted=""           (6)
                                  concurrent-consumers=""         (7)
                                  connection-factory=""           (8)
                                  error-channel=""                (9)
                                  expose-listener-channel=""      (10)
                                  header-mapper=""                (11)
                                  mapped-request-headers=""       (12)
                                  listener-container=""           (13)
                                  message-converter=""            (14)
                                  message-properties-converter="" (15)
                                  phase=""                        (16)
                                  prefetch-count=""               (17)
                                  receive-timeout=""              (18)
                                  recovery-interval=""            (19)
                                  missing-queues-fatal=""         (20)
                                  shutdown-timeout=""             (21)
                                  task-executor=""                (22)
                                  transaction-attribute=""        (23)
                                  transaction-manager=""          (24)
                                  tx-size=""                      (25)
                                  consumers-per-queue             (26)
                                  batch-mode="MESSAGES"/>         (27)
```

|**1** |                                                                                                                                                                                                                                                                                                                                                                  The unique ID for this adapter.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                   |
|------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2** |                                                                                                                                                                                                                                                                                                                                                    Message channel to which converted messages should be sent.<br/>Required.                                                                                                                                                                                                                                                                                                                                                     |
|**3** |                                                                                                                                                                                                                                                                                                                                      Names of the AMQP queues (comma-separated list) from which messages should be consumed.<br/>Required.                                                                                                                                                                                                                                                                                                                                       |
|**4** |                                                                                                                                         Acknowledge mode for the `MessageListenerContainer`.<br/>When set to `MANUAL`, the delivery tag and channel are provided in message headers `amqp_deliveryTag` and `amqp_channel`, respectively.<br/>The user application is responsible for acknowledgement.`NONE` means no acknowledgements (`autoAck`).`AUTO` means the adapter’s container acknowledges when the downstream flow completes.<br/>Optional (defaults to AUTO).<br/>See [Inbound Endpoint Acknowledge Mode](#amqp-inbound-ack).                                                                                                                                         |
|**5** |                                                                                                                                                                                                                                                                                                                                  Extra AOP Advices to handle cross-cutting behavior associated with this inbound channel adapter.<br/>Optional.                                                                                                                                                                                                                                                                                                                                  |
|**6** |                                                                                                                                                                                                                           Flag to indicate that channels created by this component are transactional.<br/>If true, it tells the framework to use a transactional channel and to end all operations (send or receive) with a commit or rollback, depending on the outcome, with an exception that signals a rollback.<br/>Optional (Defaults to false).                                                                                                                                                                                                                           |
|**7** |                                                                                                                                                                         Specify the number of concurrent consumers to create.<br/>The default is `1`.<br/>We recommend raising the number of concurrent consumers to scale the consumption of messages coming in from a queue.<br/>However, note that any ordering guarantees are lost once multiple consumers are registered.<br/>In general, use one consumer for low-volume queues.<br/>Not allowed when 'consumers-per-queue' is set.<br/>Optional.                                                                                                                                                                          |
|**8** |                                                                                                                                                                                                                                                                                                                                       Bean reference to the RabbitMQ `ConnectionFactory`.<br/>Optional (defaults to `connectionFactory`).                                                                                                                                                                                                                                                                                                                                        |
|**9** |                                                                                                                                                                                                                                                                                                                                                      Message channel to which error messages should be sent.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                       |
|**10**|                                                                                                                                                                                                                                                                                                              Whether the listener channel (com.rabbitmq.client.Channel) is exposed to a registered `ChannelAwareMessageListener`.<br/>Optional (defaults to true).                                                                                                                                                                                                                                                                                                               |
|**11**|                                                                                                                                                                                 A reference to an `AmqpHeaderMapper` to use when receiving AMQP Messages.<br/>Optional.<br/>By default, only standard AMQP properties (such as `contentType`) are copied to Spring Integration `MessageHeaders`.<br/>Any user-defined headers within the AMQP `MessageProperties` are NOT copied to the message by the default `DefaultAmqpHeaderMapper`.<br/>Not allowed if 'request-header-names' is provided.                                                                                                                                                                                 |
|**12**|                                                                                                                                                                                                              Comma-separated list of the names of AMQP Headers to be mapped from the AMQP request into the `MessageHeaders`.<br/>This can only be provided if the 'header-mapper' reference is not provided.<br/>The values in this list can also be simple patterns to be matched against the header names (such as "\*" or "thing1\*, thing2" or "\*something").                                                                                                                                                                                                               |
|**13**|                                                                                         Reference to the `AbstractMessageListenerContainer` to use for receiving AMQP Messages.<br/>If this attribute is provided, no other attribute related to the listener container configuration should be provided.<br/>In other words, by setting this reference, you must take full responsibility for the listener container configuration.<br/>The only exception is the `MessageListener` itself.<br/>Since that is actually the core responsibility of this channel adapter implementation, the referenced listener container must not already have its own `MessageListener`.<br/>Optional.                                                                                         |
|**14**|                                                                                                                                                                                                                                                                                                                                                    The `MessageConverter` to use when receiving AMQP messages.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                     |
|**15**|                                                                                                                                                                                                                                                                                                                                               The `MessagePropertiesConverter` to use when receiving AMQP messages.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                |
|**16**|                                                                                                                                                                                                    Specifies the phase in which the underlying `AbstractMessageListenerContainer` should be started and stopped.<br/>The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that.<br/>By default, this value is `Integer.MAX_VALUE`, meaning that this container starts as late as possible and stops as soon as possible.<br/>Optional.                                                                                                                                                                                                    |
|**17**|                                                                                                                                                                                                                                      Tells the AMQP broker how many messages to send to each consumer in a single request.<br/>Often, you can set this value high to improve throughput.<br/>It should be greater than or equal to the transaction size (see the `tx-size` attribute, later in this list).<br/>Optional (defaults to `1`).                                                                                                                                                                                                                                       |
|**18**|                                                                                                                                                                                                                                                                                                                                                       Receive timeout in milliseconds.<br/>Optional (defaults to `1000`).                                                                                                                                                                                                                                                                                                                                                        |
|**19**|                                                                                                                                                                                                                                                                                                           Specifies the interval between recovery attempts of the underlying `AbstractMessageListenerContainer` (in milliseconds).<br/>Optional (defaults to `5000`).                                                                                                                                                                                                                                                                                                            |
|**20**|                                                                                                                                                                    If 'true' and none of the queues are available on the broker, the container throws a fatal exception during startup and stops if the queues are deleted when the container is running (after making three attempts to passively declare the queues).<br/>If `false`, the container does not throw an exception and goes into recovery mode, attempting to restart according to the `recovery-interval`.<br/>Optional (defaults to `true`).                                                                                                                                                                    |
|**21**|                                                                                                                                                     The time to wait for workers (in milliseconds) after the underlying `AbstractMessageListenerContainer` is stopped and before the AMQP connection is forced closed.<br/>If any workers are active when the shutdown signal comes, they are allowed to finish processing as long as they can finish within this timeout.<br/>Otherwise, the connection is closed and messages remain unacknowledged (if the channel is transactional).<br/>Optional (defaults to `5000`).                                                                                                                                                      |
|**22**|                                                                                                                                                             By default, the underlying `AbstractMessageListenerContainer` uses a `SimpleAsyncTaskExecutor` implementation, that fires up a new thread for each task, running it asynchronously.<br/>By default, the number of concurrent threads is unlimited.<br/>Note that this implementation does not reuse threads.<br/>Consider using a thread-pooling `TaskExecutor` implementation as an alternative.<br/>Optional (defaults to `SimpleAsyncTaskExecutor`).                                                                                                                                                              |
|**23**|                                                                                                                                                                                                                                                      By default, the underlying `AbstractMessageListenerContainer` creates a new instance of the `DefaultTransactionAttribute` (it takes the EJB approach to rolling back on runtime but not checked exceptions).<br/>Optional (defaults to `DefaultTransactionAttribute`).                                                                                                                                                                                                                                                      |
|**24**|Sets a bean reference to an external `PlatformTransactionManager` on the underlying `AbstractMessageListenerContainer`.<br/>The transaction manager works in conjunction with the `channel-transacted` attribute.<br/>If there is already a transaction in progress when the framework is sending or receiving a message and the `channelTransacted` flag is `true`, the commit or rollback of the messaging transaction is deferred until the end of the current transaction.<br/>If the `channelTransacted` flag is `false`, no transaction semantics apply to the messaging operation (it is auto-acked).<br/>For further information, see[Transactions with Spring AMQP](https://docs.spring.io/spring-amqp/reference/html/%255Freference.html#%5Ftransactions).<br/>Optional.|
|**25**|                                                                                                                                                                                                                                Tells the `SimpleMessageListenerContainer` how many messages to process in a single transaction (if the channel is transactional).<br/>For best results, it should be less than or equal to the value set in `prefetch-count`.<br/>Not allowed when 'consumers-per-queue' is set.<br/>Optional (defaults to `1`).                                                                                                                                                                                                                                 |
|**26**|                                                                                                                                                                                                                                                   Indicates that the underlying listener container should be a `DirectMessageListenerContainer` instead of the default `SimpleMessageListenerContainer`.<br/>See the [Spring AMQP Reference Manual](https://docs.spring.io/spring-amqp/reference/html/) for more information.                                                                                                                                                                                                                                                    |
|**27**|           When the container’s `consumerBatchEnabled` is `true`, determines how the adapter presents the batch of messages in the message payload.<br/>When set to `MESSAGES` (default), the payload is a `List<Message<?>>` where each message has headers mapped from the incoming AMQP `Message` and the payload is the converted `body`.<br/>When set to `EXTRACT_PAYLOADS`, the payload is a `List<?>` where the elements are converted from the AMQP `Message` body.`EXTRACT_PAYLOADS_WITH_HEADERS` is similar to `EXTRACT_PAYLOADS` but, in addition, the headers from each message are mapped from the `MessageProperties` into a `List<Map<String, Object>` at the corresponding index; the header name is `AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS`.            |

|   |container<br/><br/>Note that when configuring an external container with XML, you cannot use the Spring AMQP namespace to define the container.<br/>This is because the namespace requires at least one `<listener/>` element.<br/>In this environment, the listener is internal to the adapter.<br/>For this reason, you must define the container by using a normal Spring `<bean/>` definition, as the following example shows:<br/><br/>```<br/><bean id="container"<br/> class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"><br/>    <property name="connectionFactory" ref="connectionFactory" /><br/>    <property name="queueNames" value="aName.queue" /><br/>    <property name="defaultRequeueRejected" value="false"/><br/></bean><br/>```|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

|   |Even though the Spring Integration JMS and AMQP support is similar, important differences exist.<br/>The JMS inbound channel adapter is using a `JmsDestinationPollingSource` under the covers and expects a configured poller.<br/>The AMQP inbound channel adapter uses an `AbstractMessageListenerContainer` and is message driven.<br/>In that regard, it is more similar to the JMS message-driven channel adapter.|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

Starting with version 5.5, the `AmqpInboundChannelAdapter` can be configured with an `org.springframework.amqp.rabbit.retry.MessageRecoverer` strategy which is used in the `RecoveryCallback` when the retry operation is called internally.
See `setMessageRecoverer()` JavaDocs for more information.

#### Batched Messages

See [the Spring AMQP Documentation](https://docs.spring.io/spring-amqp/docs/current/reference/html/#template-batching) for more information about batched messages.

To produce batched messages with Spring Integration, simply configure the outbound endpoint with a `BatchingRabbitTemplate`.

When receiving batched messages, by default, the listener containers extract each fragment message and the adapter will produce a `Message<?>` for each fragment.
Starting with version 5.2, if the container’s `deBatchingEnabled` property is set to `false`, the de-batching is performed by the adapter instead, and a single `Message<List<?>>` is produced with the payload being a list of the fragment payloads (after conversion if appropriate).

The default `BatchingStrategy` is the `SimpleBatchingStrategy`, but this can be overridden on the adapter.

|   |The `org.springframework.amqp.rabbit.retry.MessageBatchRecoverer` must be used with batches when recovery is required for retry operations.|
|---|-------------------------------------------------------------------------------------------------------------------------------------------|

### Polled Inbound Channel Adapter

#### Overview

Version 5.0.1 introduced a polled channel adapter, letting you fetch individual messages on demand — for example, with a `MessageSourcePollingTemplate` or a poller.
See [Deferred Acknowledgment Pollable Message Source](./polling-consumer.html#deferred-acks-message-source) for more information.

It does not currently support XML configuration.

The following example shows how to configure an `AmqpMessageSource`:

Java DSL

```
@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE),
                    e -> e.poller(Pollers.fixedDelay(1_000)).autoStartup(false))
            .handle(p -> {
                ...
            })
            .get();
}
```

Java

```
@Bean
public AmqpMessageSource source(ConnectionFactory connectionFactory) {
    return new AmqpMessageSource(connectionFactory, "someQueue");
}
```

See the [Javadoc](https://docs.spring.io/spring-integration/api/org/springframework/integration/amqp/inbound/AmqpMessageSource.html) for configuration properties.

XML

```
This adapter currently does not have XML configuration support.
```

#### Batched Messages

See [Batched Messages](#amqp-debatching).

For the polled adapter, there is no listener container, batched messages are always debatched (if the `BatchingStrategy` supports doing so).

### Inbound Gateway

The inbound gateway supports all the attributes on the inbound channel adapter (except that 'channel' is replaced by 'request-channel'), plus some additional attributes.
The following listing shows the available attributes:

Java DSL

```
@Bean // return the upper cased payload
public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, "foo"))
            .transform(String.class, String::toUpperCase)
            .get();
}
```

Java

```
@Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundGateway inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundGateway gateway = new AmqpInboundGateway(listenerContainer);
    gateway.setRequestChannel(channel);
    gateway.setDefaultReplyTo("bar");
    return gateway;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
                    new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("foo");
    container.setConcurrentConsumers(2);
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new AbstractReplyProducingMessageHandler() {

        @Override
        protected Object handleRequestMessage(Message<?> requestMessage) {
            return "reply to " + requestMessage.getPayload();
        }

    };
}
```

XML

```
<int-amqp:inbound-gateway
                          id="inboundGateway"                (1)
                          request-channel="myRequestChannel" (2)
                          header-mapper=""                   (3)
                          mapped-request-headers=""          (4)
                          mapped-reply-headers=""            (5)
                          reply-channel="myReplyChannel"     (6)
                          reply-timeout="1000"               (7)
                          amqp-template=""                   (8)
                          default-reply-to="" />             (9)
```

|**1**|                                                                                                                                                                                                                                                                                           The Unique ID for this adapter.<br/>Optional.                                                                                                                                                                                                                                                                                            |
|-----|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2**|                                                                                                                                                                                                                                                                                Message channel to which converted messages are sent.<br/>Required.                                                                                                                                                                                                                                                                                 |
|**3**|                                                                                   A reference to an `AmqpHeaderMapper` to use when receiving AMQP Messages.<br/>Optional.<br/>By default, only standard AMQP properties (such as `contentType`) are copied to and from Spring Integration `MessageHeaders`.<br/>Any user-defined headers within the AMQP `MessageProperties` are not copied to or from an AMQP message by the default `DefaultAmqpHeaderMapper`.<br/>Not allowed if 'request-header-names' or 'reply-header-names' is provided.                                                                                    |
|**4**|                                                                                                                                      Comma-separated list of names of AMQP Headers to be mapped from the AMQP request into the `MessageHeaders`.<br/>This attribute can be provided only if the 'header-mapper' reference is not provided.<br/>The values in this list can also be simple patterns to be matched against the header names (e.g. `"*"` or `"thing1*, thing2"` or `"*thing1"`).                                                                                                                                      |
|**5**|                                                  Comma-separated list of names of `MessageHeaders` to be mapped into the AMQP message properties of the AMQP reply message.<br/>All standard Headers (such as `contentType`) are mapped to AMQP Message Properties, while user-defined headers are mapped to the 'headers' property.<br/>This attribute can only be provided if the 'header-mapper' reference is not provided.<br/>The values in this list can also be simple patterns to be matched against the header names (for example, `"*"` or `"foo*, bar"` or `"*foo"`).                                                   |
|**6**|                                                                                                                                                                                                                                                                                  Message Channel where reply Messages are expected.<br/>Optional.                                                                                                                                                                                                                                                                                  |
|**7**|                                                                                                                                                                    Sets the `receiveTimeout` on the underlying `o.s.i.core.MessagingTemplate` for receiving messages from the reply channel.<br/>If not specified, this property defaults to `1000` (1 second).<br/>Only applies if the container thread hands off to another thread before the reply is sent.                                                                                                                                                                     |
|**8**|                                                                                                                                                                                                                          The customized `AmqpTemplate` bean reference (to have more control over the reply messages to send).<br/>You can provide an alternative implementation to the `RabbitTemplate`.                                                                                                                                                                                                                           |
|**9**|The `replyTo` `o.s.amqp.core.Address` to be used when the `requestMessage` does not have a `replyTo`property.<br/>If this option is not specified, no `amqp-template` is provided, no `replyTo` property exists in the request message, and<br/>an `IllegalStateException` is thrown because the reply cannot be routed.<br/>If this option is not specified and an external `amqp-template` is provided, no exception is thrown.<br/>You must either specify this option or configure a default `exchange` and `routingKey` on that template,<br/>if you anticipate cases when no `replyTo` property exists in the request message.|

See the note in [Inbound Channel Adapter](#amqp-inbound-channel-adapter) about configuring the `listener-container` attribute.

Starting with version 5.5, the `AmqpInboundChannelAdapter` can be configured with an `org.springframework.amqp.rabbit.retry.MessageRecoverer` strategy which is used in the `RecoveryCallback` when the retry operation is called internally.
See `setMessageRecoverer()` JavaDocs for more information.

#### Batched Messages

See [Batched Messages](#amqp-debatching).

### Inbound Endpoint Acknowledge Mode

By default, the inbound endpoints use the `AUTO` acknowledge mode, which means the container automatically acknowledges the message when the downstream integration flow completes (or a message is handed off to another thread by using a `QueueChannel` or `ExecutorChannel`).
Setting the mode to `NONE` configures the consumer such that acknowledgments are not used at all (the broker automatically acknowledges the message as soon as it is sent).
Setting the mode to `MANUAL` lets user code acknowledge the message at some other point during processing.
To support this, with this mode, the endpoints provide the `Channel` and `deliveryTag` in the `amqp_channel` and `amqp_deliveryTag` headers, respectively.

You can perform any valid Rabbit command on the `Channel` but, generally, only `basicAck` and `basicNack` (or `basicReject`) are used.
In order to not interfere with the operation of the container, you should not retain a reference to the channel and use it only in the context of the current message.

|   |Since the `Channel` is a reference to a “live” object, it cannot be serialized and is lost if a message is persisted.|
|---|---------------------------------------------------------------------------------------------------------------------|

The following example shows how you might use `MANUAL` acknowledgement:

```
@ServiceActivator(inputChannel = "foo", outputChannel = "bar")
public Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel,
        @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception {

    // Do some processing

    if (allOK) {
        channel.basicAck(deliveryTag, false);

        // perhaps do some more processing

    }
    else {
        channel.basicNack(deliveryTag, false, true);
    }
    return someResultForDownStreamProcessing;
}
```

### Outbound Endpoints

The following outbound endpoints have many similar configuration options.
Starting with version 5.2, the `confirm-timeout` has been added.
Normally, when publisher confirms are enabled, the broker will quickly return an ack (or nack) which will be sent to the appropriate channel.
If a channel is closed before the confirm is received, the Spring AMQP framework will synthesize a nack.
"Missing" acks should never occur but, if you set this property, the endpoint will periodically check for them and synthesize a nack if the time elapses without a confirm being received.

### Outbound Channel Adapter

The following example shows the available properties for an AMQP outbound channel adapter:

Java DSL

```
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate,
        MessageChannel amqpOutboundChannel) {
    return IntegrationFlows.from(amqpOutboundChannel)
            .handle(Amqp.outboundAdapter(amqpTemplate)
                        .routingKey("queue1")) // default exchange - route to queue 'queue1'
            .get();
}
```

Java

```
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
    AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
    outbound.setRoutingKey("queue1"); // default exchange - route to queue 'queue1'
    return outbound;
}

@Bean
public MessageChannel amqpOutboundChannel() {
    return new DirectChannel();
}
```

XML

```
<int-amqp:outbound-channel-adapter id="outboundAmqp"             (1)
                               channel="outboundChannel"         (2)
                               amqp-template="myAmqpTemplate"    (3)
                               exchange-name=""                  (4)
                               exchange-name-expression=""       (5)
                               order="1"                         (6)
                               routing-key=""                    (7)
                               routing-key-expression=""         (8)
                               default-delivery-mode""           (9)
                               confirm-correlation-expression="" (10)
                               confirm-ack-channel=""            (11)
                               confirm-nack-channel=""           (12)
                               confirm-timeout=""                (13)
                               wait-for-confirm=""               (14)
                               return-channel=""                 (15)
                               error-message-strategy=""         (16)
                               header-mapper=""                  (17)
                               mapped-request-headers=""         (18)
                               lazy-connect="true"               (19)
                               multi-send="false"/>              (20)
```

|**1** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    The unique ID for this adapter.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               Message channel to which messages should be sent to have them converted and published to an AMQP exchange.<br/>Required.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|**3** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              Bean reference to the configured AMQP template.<br/>Optional (defaults to `amqpTemplate`).                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|**4** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       The name of the AMQP exchange to which messages are sent.<br/>If not provided, messages are sent to the default, no-name exchange.<br/>Mutually exclusive with 'exchange-name-expression'.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|**5** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages are sent, with the message as the root object.<br/>If not provided, messages are sent to the default, no-name exchange.<br/>Mutually exclusive with 'exchange-name'.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|**6** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover.<br/>Optional (defaults to `Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]`).                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
|**7** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          The fixed routing-key to use when sending messages.<br/>By default, this is an empty `String`.<br/>Mutually exclusive with 'routing-key-expression'.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|**8** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         A SpEL expression that is evaluated to determine the routing key to use when sending messages, with the message as the root object (for example, 'payload.key').<br/>By default, this is an empty `String`.<br/>Mutually exclusive with 'routing-key'.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|**9** |                                                                                                                                                                                                                                                                                                                                                                                                      The default delivery mode for messages: `PERSISTENT` or `NON_PERSISTENT`.<br/>Overridden if the `header-mapper` sets the delivery mode.<br/>If the Spring Integration message header `amqp_deliveryMode` is present, the `DefaultHeaderMapper` sets the value.<br/>If this attribute is not supplied and the header mapper does not set it, the default depends on the underlying Spring AMQP `MessagePropertiesConverter` used by the `RabbitTemplate`.<br/>If that is not customized at all, the default is `PERSISTENT`.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                       |
|**10**|An expression that defines correlation data.<br/>When provided, this configures the underlying AMQP template to receive publisher confirmations.<br/>Requires a dedicated `RabbitTemplate` and a `CachingConnectionFactory` with the `publisherConfirms` property set to `true`.<br/>When a publisher confirmation is received and correlation data is supplied, it is written to either the `confirm-ack-channel` or the `confirm-nack-channel`, depending on the confirmation type.<br/>The payload of the confirmation is the correlation data, as defined by this expression.<br/>The message has an 'amqp\_publishConfirm' header set to `true` (`ack`) or `false` (`nack`).<br/>Examples: `headers['myCorrelationData']` and `payload`.<br/>Version 4.1 introduced the `amqp_publishConfirmNackCause` message header.<br/>It contains the `cause` of a 'nack' for a publisher confirmation.<br/>Starting with version 4.2, if the expression resolves to a `Message<?>` instance (such as `#this`), the message emitted on the `ack`/`nack` channel is based on that message, with the additional header(s) added.<br/>Previously, a new message was created with the correlation data as its payload, regardless of type.<br/>Also see [Alternative Mechanism for Publisher Confirms and Returns](#alternative-confirms-returns).<br/>Optional.|
|**11**|                                                                                                                                                                                                                                                                                                                                                                                                                                          The channel to which positive (`ack`) publisher confirms are sent.<br/>The payload is the correlation data defined by the `confirm-correlation-expression`.<br/>If the expression is `#root` or `#this`, the message is built from the original message, with the `amqp_publishConfirm` header set to `true`.<br/>Also see [Alternative Mechanism for Publisher Confirms and Returns](#alternative-confirms-returns).<br/>Optional (the default is `nullChannel`).                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|**12**|                                                                                                                                                                                                                                                                                                                                               The channel to which negative (`nack`) publisher confirmations are sent.<br/>The payload is the correlation data defined by the `confirm-correlation-expression` (if there is no `ErrorMessageStrategy` configured).<br/>If the expression is `#root` or `#this`, the message is built from the original message, with the `amqp_publishConfirm` header set to `false`.<br/>When there is an `ErrorMessageStrategy`, the message is an `ErrorMessage` with a `NackedAmqpMessageException` payload.<br/>Also see [Alternative Mechanism for Publisher Confirms and Returns](#alternative-confirms-returns).<br/>Optional (the default is `nullChannel`).                                                                                                                                                                                                                                                                                                                                                |
|**13**|                                                                                                                                                                                                                                                                                                                                                                                                                                                     When set, the adapter will synthesize a negative acknowledgment (nack) if a publisher confirm is not received within this time in milliseconds.<br/>Pending confirms are checked every 50% of this value, so the actual time a nack is sent will be between 1x and 1.5x this value.<br/>Also see [Alternative Mechanism for Publisher Confirms and Returns](#alternative-confirms-returns).<br/>Default none (nacks will not be generated).                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|**14**|                                                                                                                                                                                                                                                                                                                                                                                                  When set to true, the calling thread will block, waiting for a publisher confirmation.<br/>This requires a `RabbitTemplate` configured for confirms as well as a `confirm-correlation-expression`.<br/>The thread will block for up to `confirm-timeout` (or 5 seconds by default).<br/>If a timeout occurs, a `MessageTimeoutException` will be thrown.<br/>If returns are enabled and a message is returned, or any other exception occurs while awaiting the confirm, a `MessageHandlingException` will be thrown, with an appropriate message.                                                                                                                                                                                                                                                                                                                                                                                                  |
|**15**|                                                                                                                                                                                                                                                                                                                                    The channel to which returned messages are sent.<br/>When provided, the underlying AMQP template is configured to return undeliverable messages to the adapter.<br/>When there is no `ErrorMessageStrategy` configured, the message is constructed from the data received from AMQP, with the following additional headers: `amqp_returnReplyCode`, `amqp_returnReplyText`, `amqp_returnExchange`, `amqp_returnRoutingKey`.<br/>When there is an `ErrorMessageStrategy`, the message is an `ErrorMessage` with a `ReturnedAmqpMessageException` payload.<br/>Also see [Alternative Mechanism for Publisher Confirms and Returns](#alternative-confirms-returns).<br/>Optional.                                                                                                                                                                                                                                                                                                                                    |
|**16**|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              A reference to an `ErrorMessageStrategy` implementation used to build `ErrorMessage` instances when sending returned or negatively acknowledged messages.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|**17**|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     A reference to an `AmqpHeaderMapper` to use when sending AMQP Messages.<br/>By default, only standard AMQP properties (such as `contentType`) are copied to the Spring Integration `MessageHeaders`.<br/>Any user-defined headers is not copied to the message by the default`DefaultAmqpHeaderMapper`.<br/>Not allowed if 'request-header-names' is provided.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|**18**|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              Comma-separated list of names of AMQP Headers to be mapped from the `MessageHeaders` to the AMQP Message.<br/>Not allowed if the 'header-mapper' reference is provided.<br/>The values in this list can also be simple patterns to be matched against the header names (e.g. `"*"` or `"thing1*, thing2"` or `"*thing1"`).                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|**19**|                                                                                                                                                                                                                                                                                                                                                                                                                                                                   When set to `false`, the endpoint attempts to connect to the broker during application context initialization.<br/>This allows “fail fast” detection of bad configuration but also causes initialization to fail if the broker is down.<br/>When `true` (the default), the connection is established (if it does not already exist because some other component established it) when the first message is sent.                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|**20**|                                                                                                                                                                                                                                                                                                                                                                                                                                When set to `true`, payloads of type `Iterable<Message<?>>` will be sent as discrete messages on the same channel within the scope of a single `RabbitTemplate` invocation.<br/>Requires a `RabbitTemplate`.<br/>When `wait-for-confirms` is true, `RabbitTemplate.waitForConfirmsOrDie()` is invoked after the messages have been sent.<br/>With a transactional template, the sends will be performed in either a new transaction or one that has already been started (if present).                                                                                                                                                                                                                                                                                                                                                                                                                                |

|   |return-channel<br/><br/>Using a `return-channel` requires a `RabbitTemplate` with the `mandatory` property set to `true` and a `CachingConnectionFactory` with the `publisherReturns` property set to `true`.<br/>When using multiple outbound endpoints with returns, a separate `RabbitTemplate` is needed for each endpoint.|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

### Outbound Gateway

The following listing shows the possible properties for an AMQP Outbound Gateway:

Java DSL

```
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) {
    return f -> f.handle(Amqp.outboundGateway(amqpTemplate)
                    .routingKey("foo")) // default exchange - route to queue 'foo'
            .get();
}

@MessagingGateway(defaultRequestChannel = "amqpOutbound.input")
public interface MyGateway {

    String sendToRabbit(String data);

}
```

Java

```
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
    AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
    outbound.setExpectReply(true);
    outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
    return outbound;
}

@Bean
public MessageChannel amqpOutboundChannel() {
    return new DirectChannel();
}

@MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
public interface MyGateway {

    String sendToRabbit(String data);

}
```

XML

```
<int-amqp:outbound-gateway id="outboundGateway"               (1)
                           request-channel="myRequestChannel" (2)
                           amqp-template=""                   (3)
                           exchange-name=""                   (4)
                           exchange-name-expression=""        (5)
                           order="1"                          (6)
                           reply-channel=""                   (7)
                           reply-timeout=""                   (8)
                           requires-reply=""                  (9)
                           routing-key=""                     (10)
                           routing-key-expression=""          (11)
                           default-delivery-mode""            (12)
                           confirm-correlation-expression=""  (13)
                           confirm-ack-channel=""             (14)
                           confirm-nack-channel=""            (15)
                           confirm-timeout=""                 (16)
                           return-channel=""                  (17)
                           error-message-strategy=""          (18)
                           lazy-connect="true" />             (19)
```

|**1** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       The unique ID for this adapter.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    Message channel to which messages are sent to have them converted and published to an AMQP exchange.<br/>Required.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|**3** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                Bean reference to the configured AMQP template.<br/>Optional (defaults to `amqpTemplate`).                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|**4** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      The name of the AMQP exchange to which messages should be sent.<br/>If not provided, messages are sent to the default, no-name cxchange.<br/>Mutually exclusive with 'exchange-name-expression'.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|**5** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages should be sent, with the message as the root object.<br/>If not provided, messages are sent to the default, no-name exchange.<br/>Mutually exclusive with 'exchange-name'.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|**6** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover.<br/>Optional (defaults to `Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]`).                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|**7** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   Message channel to which replies should be sent after being received from an AMQP queue and converted.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|**8** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        The time the gateway waits when sending the reply message to the `reply-channel`.<br/>This only applies if the `reply-channel` can block — such as a `QueueChannel` with a capacity limit that is currently full.<br/>Defaults to infinity.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|**9** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 When `true`, the gateway throws an exception if no reply message is received within the `AmqpTemplate’s `replyTimeout` property.<br/>Defaults to `true`.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|**10**|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              The `routing-key` to use when sending messages.<br/>By default, this is an empty `String`.<br/>Mutually exclusive with 'routing-key-expression'.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|**11**|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          A SpEL expression that is evaluated to determine the `routing-key` to use when sending messages, with the message as the root object (for example, 'payload.key').<br/>By default, this is an empty `String`.<br/>Mutually exclusive with 'routing-key'.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|**12**|                                                                                                                                                                                                                                                                                                                                                                         The default delivery mode for messages: `PERSISTENT` or `NON_PERSISTENT`.<br/>Overridden if the `header-mapper` sets the delivery mode.<br/>If the Spring Integration message header `amqp_deliveryMode` is present, the `DefaultHeaderMapper` sets the value.<br/>If this attribute is not supplied and the header mapper does not set it, the default depends on the underlying Spring AMQP `MessagePropertiesConverter` used by the `RabbitTemplate`.<br/>If that is not customized at all, the default is `PERSISTENT`.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                         |
|**13**|Since version 4.2.<br/>An expression defining correlation data.<br/>When provided, this configures the underlying AMQP template to receive publisher confirms.<br/>Requires a dedicated `RabbitTemplate` and a `CachingConnectionFactory` with the `publisherConfirms` property set to `true`.<br/>When a publisher confirm is received and correlation data is supplied, it is written to either the `confirm-ack-channel` or the `confirm-nack-channel`, depending on the confirmation type.<br/>The payload of the confirm is the correlation data, as defined by this expression.<br/>The message has a header 'amqp\_publishConfirm' set to `true` (`ack`) or `false` (`nack`).<br/>For `nack` confirmations, Spring Integration provides an additional header `amqp_publishConfirmNackCause`.<br/>Examples: `headers['myCorrelationData']` and `payload`.<br/>If the expression resolves to a `Message<?>` instance (such as `#this`), the message<br/>emitted on the `ack`/`nack` channel is based on that message, with the additional headers added.<br/>Previously, a new message was created with the correlation data as its payload, regardless of type.<br/>Also see [Alternative Mechanism for Publisher Confirms and Returns](#alternative-confirms-returns).<br/>Optional.|
|**14**|                                                                                                                                                                                                                                                                                                                                                                                                            The channel to which positive (`ack`) publisher confirmations are sent.<br/>The payload is the correlation data defined by `confirm-correlation-expression`.<br/>If the expression is `#root` or `#this`, the message is built from the original message, with the `amqp_publishConfirm` header set to `true`.<br/>Also see [Alternative Mechanism for Publisher Confirms and Returns](#alternative-confirms-returns).<br/>Optional (the default is `nullChannel`).                                                                                                                                                                                                                                                                                                                                                                                                            |
|**15**|                                                                                                                                                                                                                                                                                                                    The channel to which negative (`nack`) publisher confirmations are sent.<br/>The payload is the correlation data defined by `confirm-correlation-expression` (if there is no `ErrorMessageStrategy` configured).<br/>If the expression is `#root` or `#this`, the message is built from the original message, with the `amqp_publishConfirm` header set to `false`.<br/>When there is an `ErrorMessageStrategy`, the message is an `ErrorMessage` with a `NackedAmqpMessageException` payload.<br/>Also see [Alternative Mechanism for Publisher Confirms and Returns](#alternative-confirms-returns).<br/>Optional (the default is `nullChannel`).                                                                                                                                                                                                                                                                                                                    |
|**16**|                                                                                                                                                                                                                                                                                                                                                                                                                                                                            When set, the gateway will synthesize a negative acknowledgment (nack) if a publisher confirm is not received within this time in milliseconds.<br/>Pending confirms are checked every 50% of this value, so the actual time a nack is sent will be between 1x and 1.5x this value.<br/>Default none (nacks will not be generated).                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
|**17**|                                                                                                                                                                                                                                                                                                    The channel to which returned messages are sent.<br/>When provided, the underlying AMQP template is configured to return undeliverable messages to the adapter.<br/>When there is no `ErrorMessageStrategy` configured, the message is constructed from the data received from AMQP, with the following additional headers: `amqp_returnReplyCode`, `amqp_returnReplyText`, `amqp_returnExchange`, and `amqp_returnRoutingKey`.<br/>When there is an `ErrorMessageStrategy`, the message is an `ErrorMessage` with a `ReturnedAmqpMessageException` payload.<br/>Also see [Alternative Mechanism for Publisher Confirms and Returns](#alternative-confirms-returns).<br/>Optional.                                                                                                                                                                                                                                                                                                     |
|**18**|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 A reference to an `ErrorMessageStrategy` implementation used to build `ErrorMessage` instances when sending returned or negatively acknowledged messages.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|**19**|                                                                                                                                                                                                                                                                                                                                                                                                                                           When set to `false`, the endpoint attempts to connect to the broker during application context initialization.<br/>This allows “fail fast” detection of bad configuration by logging an error message if the broker is down.<br/>When `true` (the default), the connection is established (if it does not already exist because some other component established it) when the first message is sent.                                                                                                                                                                                                                                                                                                                                                                                                                                            |

|   |return-channel<br/><br/>Using a `return-channel` requires a `RabbitTemplate` with the `mandatory` property set to `true` and a `CachingConnectionFactory` with the `publisherReturns` property set to `true`.<br/>When using multiple outbound endpoints with returns, a separate `RabbitTemplate` is needed for each endpoint.|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

|   |The underlying `AmqpTemplate` has a default `replyTimeout` of five seconds.<br/>If you require a longer timeout, you must configure it on the `template`.|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------|

Note that the only difference between the outbound adapter and outbound gateway configuration is the setting of the`expectReply` property.

### Asynchronous Outbound Gateway

The gateway discussed in the previous section is synchronous, in that the sending thread is suspended until a
reply is received (or a timeout occurs).
Spring Integration version 4.3 added an asynchronous gateway, which uses the `AsyncRabbitTemplate` from Spring AMQP.
When a message is sent, the thread returns immediately after the send operation completes, and, when the message is received, the reply is sent on the template’s listener container thread.
This can be useful when the gateway is invoked on a poller thread.
The thread is released and is available for other tasks in the framework.

The following listing shows the possible configuration options for an AMQP asynchronous outbound gateway:

Java DSL

```
@Configuration
public class AmqpAsyncApplication {

    @Bean
    public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
        return f -> f
                .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
                        .routingKey("queue1")); // default exchange - route to queue 'queue1'
    }

    @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
    public interface MyGateway {

        String sendToRabbit(String data);

    }

}
```

Java

```
@Configuration
public class AmqpAsyncConfig {

    @Bean
    @ServiceActivator(inputChannel = "amqpOutboundChannel")
    public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {
        AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
        outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
        return outbound;
    }

    @Bean
    public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
                     SimpleMessageListenerContainer replyContainer) {

        return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
    }

    @Bean
    public SimpleMessageListenerContainer replyContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
        container.setQueueNames("asyncRQ1");
        return container;
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
        return new DirectChannel();
    }

}
```

XML

```
<int-amqp:outbound-async-gateway id="asyncOutboundGateway"    (1)
                           request-channel="myRequestChannel" (2)
                           async-template=""                  (3)
                           exchange-name=""                   (4)
                           exchange-name-expression=""        (5)
                           order="1"                          (6)
                           reply-channel=""                   (7)
                           reply-timeout=""                   (8)
                           requires-reply=""                  (9)
                           routing-key=""                     (10)
                           routing-key-expression=""          (11)
                           default-delivery-mode""            (12)
                           confirm-correlation-expression=""  (13)
                           confirm-ack-channel=""             (14)
                           confirm-nack-channel=""            (15)
                           confirm-timeout=""                 (16)
                           return-channel=""                  (17)
                           lazy-connect="true" />             (18)
```

|**1** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            The unique ID for this adapter.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
|------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  Message channel to which messages should be sent in order to have them converted and published to an AMQP exchange.<br/>Required.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|**3** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            Bean reference to the configured `AsyncRabbitTemplate`.<br/>Optional (it defaults to `asyncRabbitTemplate`).                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|**4** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                           The name of the AMQP exchange to which messages should be sent.<br/>If not provided, messages are sent to the default, no-name exchange.<br/>Mutually exclusive with 'exchange-name-expression'.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
|**5** |                                                                                                                                                                                                                                                                                                                                                                                                                                         A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages are sent, with the message as the root object.<br/>If not provided, messages are sent to the default, no-name exchange.<br/>Mutually exclusive with 'exchange-name'.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|**6** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover.<br/>Optional (it defaults to `Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]`).                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|**7** |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        Message channel to which replies should be sent after being received from an AMQP queue and converted.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|**8** |                                                                                                                                                                                                                                                                                                                                                                                                                                                           The time the gateway waits when sending the reply message to the `reply-channel`.<br/>This only applies if the `reply-channel` can block — such as a `QueueChannel` with a capacity limit that is currently full.<br/>The default is infinity.                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
|**9** |                                                                                                                                                                                                                                                                                                                                                       When no reply message is received within the `AsyncRabbitTemplate’s `receiveTimeout` property and this setting is `true`, the gateway sends an error message to the inbound message’s `errorChannel` header.<br/>When no reply message is received within the `AsyncRabbitTemplate’s `receiveTimeout` property and this setting is `false`, the gateway sends an error message to the default `errorChannel` (if available).<br/>It defaults to `true`.                                                                                                                                                                                                                                                                                                                                                       |
|**10**|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    The routing-key to use when sending Messages.<br/>By default, this is an empty `String`.<br/>Mutually exclusive with 'routing-key-expression'.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|**11**|                                                                                                                                                                                                                                                                                                                                                                                                                                              A SpEL expression that is evaluated to determine the routing-key to use when sending messages,<br/>with the message as the root object (for example, 'payload.key').<br/>By default, this is an empty `String`.<br/>Mutually exclusive with 'routing-key'.<br/>Optional.                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|**12**|                                                                                                                                                                                                                                                                                                                The default delivery mode for messages: `PERSISTENT` or `NON_PERSISTENT`.<br/>Overridden if the `header-mapper` sets the delivery mode.<br/>If the Spring Integration message header (`amqp_deliveryMode`) is present, the `DefaultHeaderMapper` sets the value.<br/>If this attribute is not supplied and the header mapper does not set it, the default depends on the underlying Spring AMQP `MessagePropertiesConverter` used by the `RabbitTemplate`.<br/>If that is not customized, the default is `PERSISTENT`.<br/>Optional.                                                                                                                                                                                                                                                                                                                 |
|**13**|An expression that defines correlation data.<br/>When provided, this configures the underlying AMQP template to receive publisher confirmations.<br/>Requires a dedicated `RabbitTemplate` and a `CachingConnectionFactory` with its `publisherConfirms` property set to `true`.<br/>When a publisher confirmation is received and correlation data is supplied, the confirmation is written to either the `confirm-ack-channel` or the `confirm-nack-channel`, depending on the confirmation type.<br/>The payload of the confirmation is the correlation data as defined by this expression, and the message has its 'amqp\_publishConfirm' header set to `true` (`ack`) or `false` (`nack`).<br/>For `nack` instances, an additional header (`amqp_publishConfirmNackCause`) is provided.<br/>Examples: `headers['myCorrelationData']`, `payload`.<br/>If the expression resolves to a `Message<?>` instance (such as “#this”), the message emitted on the `ack`/`nack` channel is based on that message, with the additional headers added.<br/>Also see [Alternative Mechanism for Publisher Confirms and Returns](#alternative-confirms-returns).<br/>Optional.|
|**14**|                                                                                                                                                                                                                                                                                                                                                                    The channel to which positive (`ack`) publisher confirmations are sent.<br/>The payload is the correlation data defined by the `confirm-correlation-expression`.<br/>Requires the underlying `AsyncRabbitTemplate` to have its `enableConfirms` property set to `true`.<br/>Also see [Alternative Mechanism for Publisher Confirms and Returns](#alternative-confirms-returns).<br/>Optional (the default is `nullChannel`).                                                                                                                                                                                                                                                                                                                                                                     |
|**15**|                                                                                                                                                                                                                                                                                                                                                        Since version 4.2.<br/>The channel to which negative (`nack`) publisher confirmations are sent.<br/>The payload is the correlation data defined by the `confirm-correlation-expression`.<br/>Requires the underlying `AsyncRabbitTemplate` to have its `enableConfirms` property set to `true`.<br/>Also see [Alternative Mechanism for Publisher Confirms and Returns](#alternative-confirms-returns).<br/>Optional (the default is `nullChannel`).                                                                                                                                                                                                                                                                                                                                                         |
|**16**|                                                                                                                                                                                                                                                                                                                                                             When set, the gateway will synthesize a negative acknowledgment (nack) if a publisher confirm is not received within this time in milliseconds.<br/>Pending confirms are checked every 50% of this value, so the actual time a nack is sent will be between 1x and 1.5x this value.<br/>Also see [Alternative Mechanism for Publisher Confirms and Returns](#alternative-confirms-returns).<br/>Default none (nacks will not be generated).                                                                                                                                                                                                                                                                                                                                                             |
|**17**|                                                                                                                                                                                                                                                                                 The channel to which returned messages are sent.<br/>When provided, the underlying AMQP template is configured to return undeliverable messages to the gateway.<br/>The message is constructed from the data received from AMQP, with the following additional headers: `amqp_returnReplyCode`, `amqp_returnReplyText`, `amqp_returnExchange`, and `amqp_returnRoutingKey`.<br/>Requires the underlying `AsyncRabbitTemplate` to have its `mandatory` property set to `true`.<br/>Also see [Alternative Mechanism for Publisher Confirms and Returns](#alternative-confirms-returns).<br/>Optional.                                                                                                                                                                                                                                                                                 |
|**18**|                                                                                                                                                                                                                                                                                                                                                                             When set to `false`, the endpoint tries to connect to the broker during application context initialization.<br/>Doing so allows “fail fast” detection of bad configuration, by logging an error message if the broker is down.<br/>When `true` (the default), the connection is established (if it does not already exist because some other component established<br/>it) when the first message is sent.                                                                                                                                                                                                                                                                                                                                                                              |

See also [Asynchronous Service Activator](./service-activator.html#async-service-activator) for more information.

|   |RabbitTemplate<br/><br/>When you use confirmations and returns, we recommend that the `RabbitTemplate` wired into the `AsyncRabbitTemplate` be dedicated.<br/>Otherwise, unexpected side-effects may be encountered.|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

### Alternative Mechanism for Publisher Confirms and Returns

When the connection factory is configured for publisher confirms and returns, the sections above discuss the configuration of message channels to receive the confirms and returns asynchronously.
Starting with version 5.4, there is an additional mechanism which is generally easier to use.

In this case, do not configure a `confirm-correlation-expression` or the confirm and return channels.
Instead, add a `CorrelationData` instance in the `AmqpHeaders.PUBLISH_CONFIRM_CORRELATION` header; you can then wait for the result(s) later, by checking the state of the future in the `CorrelationData` instances for which you have sent messages.
The `returnedMessage` field will always be populated (if a message is returned) before the future is completed.

```
CorrelationData corr = new CorrelationData("someId"); // <--- Unique "id" is required for returns
someFlow.getInputChannel().send(MessageBuilder.withPayload("test")
        .setHeader("rk", "someKeyThatWontRoute")
        .setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
        .build());
...
try {
    Confirm Confirm = corr.getFuture().get(10, TimeUnit.SECONDS);
    Message returned = corr.getReturnedMessage();
    if (returned !- null) {
        // message could not be routed
    }
}
catch { ... }
```

To improve performance, you may wish to send multiple messages and wait for the confirmations later, rather than one-at-a-time.
The returned message is the raw message after conversion; you can subclass `CorrelationData` with whatever additional data you need.

### Inbound Message Conversion

Inbound messages, arriving at the channel adapter or gateway, are converted to the `spring-messaging` `Message<?>` payload using a message converter.
By default, a `SimpleMessageConverter` is used, which handles java serialization and text.
Headers are mapped using the `DefaultHeaderMapper.inboundMapper()` by default.
If a conversion error occurs, and there is no error channel defined, the exception is thrown to the container and handled by the listener container’s error handler.
The default error handler treats conversion errors as fatal and the message will be rejected (and routed to a dead-letter exchange, if the queue is so configured).
If an error channel is defined, the `ErrorMessage` payload is a `ListenerExecutionFailedException` with properties `failedMessage` (the Spring AMQP message that could not be converted) and the `cause`.
If the container `AcknowledgeMode` is `AUTO` (the default) and the error flow consumes the error without throwing an exception, the original message will be acknowledged.
If the error flow throws an exception, the exception type, in conjunction with the container’s error handler, will determine whether or not the message is requeued.
If the container is configured with `AcknowledgeMode.MANUAL`, the payload is a `ManualAckListenerExecutionFailedException` with additional properties `channel` and `deliveryTag`.
This enables the error flow to call `basicAck` or `basicNack` (or `basicReject`) for the message, to control its disposition.

### Outbound Message Conversion

Spring AMQP 1.4 introduced the `ContentTypeDelegatingMessageConverter`, where the actual converter is selected based
on the incoming content type message property.
This can be used by inbound endpoints.

As of Spring Integration version 4.3, you can use the `ContentTypeDelegatingMessageConverter` on outbound endpoints as well, with the `contentType` header specifying which converter is used.

The following example configures a `ContentTypeDelegatingMessageConverter`, with the default converter being the `SimpleMessageConverter` (which handles Java serialization and plain text), together with a JSON converter:

```
<amqp:outbound-channel-adapter id="withContentTypeConverter" channel="ctRequestChannel"
                               exchange-name="someExchange"
                               routing-key="someKey"
                               amqp-template="amqpTemplateContentTypeConverter" />

<int:channel id="ctRequestChannel"/>

<rabbit:template id="amqpTemplateContentTypeConverter"
        connection-factory="connectionFactory" message-converter="ctConverter" />

<bean id="ctConverter"
        class="o.s.amqp.support.converter.ContentTypeDelegatingMessageConverter">
    <property name="delegates">
        <map>
            <entry key="application/json">
                <bean class="o.s.amqp.support.converter.Jackson2JsonMessageConverter" />
            </entry>
        </map>
    </property>
</bean>
```

Sending a message to `ctRequestChannel` with the `contentType` header set to `application/json` causes the JSON converter to be selected.

This applies to both the outbound channel adapter and gateway.

|   |Starting with version 5.0, headers that are added to the `MessageProperties` of the outbound message are never overwritten by mapped headers (by default).<br/>Previously, this was only the case if the message converter was a `ContentTypeDelegatingMessageConverter` (in that case, the header was mapped first so that the proper converter could be selected).<br/>For other converters, such as the `SimpleMessageConverter`, mapped headers overwrote any headers added by the converter.<br/>This caused problems when an outbound message had some leftover `contentType` headers (perhaps from an inbound channel adapter) and the correct outbound `contentType` was incorrectly overwritten.<br/>The work-around was to use a header filter to remove the header before sending the message to the outbound endpoint.<br/><br/>There are, however, cases where the previous behavior is desired — for example, when a `String` payload that contains JSON, the `SimpleMessageConverter` is not aware of the content and sets the `contentType` message property to `text/plain` but your application would like to override that to `application/json` by setting the `contentType` header of the message sent to the outbound endpoint.<br/>The `ObjectToJsonTransformer` does exactly that (by default).<br/><br/>There is now a property called `headersMappedLast` on the outbound channel adapter and gateway (as well as on AMQP-backed channels).<br/>Setting this to `true` restores the behavior of overwriting the property added by the converter.<br/><br/>Starting with version 5.1.9, a similar `replyHeadersMappedLast` is provided for the `AmqpInboundGateway` when we produce a reply and would like to override headers populated by the converter.<br/>See its JavaDocs for more information.|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

### Outbound User ID

Spring AMQP version 1.6 introduced a mechanism to allow the specification of a default user ID for outbound messages.
It has always been possible to set the `AmqpHeaders.USER_ID` header, which now takes precedence over the default.
This might be useful to message recipients.
For inbound messages, if the message publisher sets the property, it is made available in the `AmqpHeaders.RECEIVED_USER_ID` header.
Note that RabbitMQ [validates that the user ID is the actual user ID for the connection or that the connection allows impersonation](https://www.rabbitmq.com/validated-user-id.html).

To configure a default user ID for outbound messages, configure it on a `RabbitTemplate` and configure the outbound adapter or gateway to use that template.
Similarly, to set the user ID property on replies, inject an appropriately configured template into the inbound gateway.
See the [Spring AMQP documentation](https://docs.spring.io/spring-amqp/reference/html/_reference.html#template-user-id) for more information.

### Delayed Message Exchange

Spring AMQP supports the [RabbitMQ Delayed Message Exchange Plugin](https://docs.spring.io/spring-amqp/reference/html/#delayed-message-exchange).
For inbound messages, the `x-delay` header is mapped to the `AmqpHeaders.RECEIVED_DELAY` header.
Setting the `AMQPHeaders.DELAY` header causes the corresponding `x-delay` header to be set in outbound messages.
You can also specify the `delay` and `delayExpression` properties on outbound endpoints (`delay-expression` when using XML configuration).
These properties take precedence over the `AmqpHeaders.DELAY` header.

### AMQP-backed Message Channels

There are two message channel implementations available.
One is point-to-point, and the other is publish-subscribe.
Both of these channels provide a wide range of configuration attributes for the underlying `AmqpTemplate` and`SimpleMessageListenerContainer` (as shown earlier in this chapter for the channel adapters and gateways).
However, the examples we show here have minimal configuration.
Explore the XML schema to view the available attributes.

A point-to-point channel might look like the following example:

```
<int-amqp:channel id="p2pChannel"/>
```

Under the covers, the preceding example causes a `Queue` named `si.p2pChannel` to be declared, and this channel sends to that `Queue` (technically, by sending to the no-name direct exchange with a routing key that matches the name of this `Queue`).
This channel also registers a consumer on that `Queue`.
If you want the channel to be “pollable” instead of message-driven, provide the `message-driven` flag with a value of `false`, as the following example shows:

```
<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>
```

A publish-subscribe channel might look like the following:

```
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
```

Under the covers, the preceding example causes a fanout exchange named `si.fanout.pubSubChannel` to be declared, and this channel sends to that fanout exchange.
This channel also declares a server-named exclusive, auto-delete, non-durable `Queue` and binds that to the fanout exchange while registering a consumer on that `Queue` to receive messages.
There is no “pollable” option for a publish-subscribe-channel.
It must be message-driven.

Starting with version 4.1, AMQP-backed message channels (in conjunction with `channel-transacted`) support`template-channel-transacted` to separate `transactional` configuration for the `AbstractMessageListenerContainer` and
for the `RabbitTemplate`.
Note that, previously, `channel-transacted` was `true` by default.
Now, by default, it is `false` for the `AbstractMessageListenerContainer`.

Prior to version 4.3, AMQP-backed channels only supported messages with `Serializable` payloads and headers.
The entire message was converted (serialized) and sent to RabbitMQ.
Now, you can set the `extract-payload` attribute (or `setExtractPayload()` when using Java configuration) to `true`.
When this flag is `true`, the message payload is converted and the headers are mapped, in a manner similar to when you use channel adapters.
This arrangement lets AMQP-backed channels be used with non-serializable payloads (perhaps with another message converter, such as the `Jackson2JsonMessageConverter`).
See [AMQP Message Headers](#amqp-message-headers) for more about the default mapped headers.
You can modify the mapping by providing custom mappers that use the `outbound-header-mapper` and `inbound-header-mapper` attributes.
You can now also specify a `default-delivery-mode`, which is used to set the delivery mode when there is no `amqp_deliveryMode` header.
By default, Spring AMQP `MessageProperties` uses `PERSISTENT` delivery mode.

|   |As with other persistence-backed channels, AMQP-backed channels are intended to provide message persistence to avoid message loss.<br/>They are not intended to distribute work to other peer applications.<br/>For that purpose, use channel adapters instead.|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

|   |Starting with version 5.0, the pollable channel now blocks the poller thread for the specified `receiveTimeout` (the default is 1 second).<br/>Previously, unlike other `PollableChannel` implementations, the thread returned immediately to the scheduler if no message was available, regardless of the receive timeout.<br/>Blocking is a little more expensive than using a `basicGet()` to retrieve a message (with no timeout), because a consumer has to be created to receive each message.<br/>To restore the previous behavior, set the poller’s `receiveTimeout` to 0.|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

#### Configuring with Java Configuration

The following example shows how to configure the channels with Java configuration:

```
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}
```

#### Configuring with the Java DSL

The following example shows how to configure the channels with the Java DSL:

```
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}
```

### AMQP Message Headers

#### Overview

The Spring Integration AMQP Adapters automatically map all AMQP properties and headers.
(This is a change from 4.3 - previously, only standard headers were mapped).
By default, these properties are copied to and from Spring Integration `MessageHeaders` by using the[`DefaultAmqpHeaderMapper`](https://docs.spring.io/spring-integration/api/org/springframework/integration/amqp/support/DefaultAmqpHeaderMapper.html).

You can pass in your own implementation of AMQP-specific header mappers, as the adapters have properties to support doing so.

Any user-defined headers within the AMQP [`MessageProperties`](https://docs.spring.io/spring-amqp/api/org/springframework/amqp/core/MessageProperties.html) are copied to or from an AMQP message, unless explicitly negated by the `requestHeaderNames` or `replyHeaderNames` properties of the `DefaultAmqpHeaderMapper`.
By default, for an outbound mapper, no `x-*` headers are mapped.
See the [caution](#header-copy-caution) that appears later in this section for why.

To override the default and revert to the pre-4.3 behavior, use `STANDARD_REQUEST_HEADERS` and`STANDARD_REPLY_HEADERS` in the properties.

|   |When mapping user-defined headers, the values can also contain simple wildcard patterns (such as `thing*` or `*thing`) to be matched.<br/>The `*` matches all headers.|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|

Starting with version 4.1, the `AbstractHeaderMapper` (a `DefaultAmqpHeaderMapper` superclass) lets the `NON_STANDARD_HEADERS` token be configured for the `requestHeaderNames` and `replyHeaderNames` properties (in addition to the existing `STANDARD_REQUEST_HEADERS` and `STANDARD_REPLY_HEADERS`) to map all user-defined headers.

The `org.springframework.amqp.support.AmqpHeaders` class identifies the default headers that are used by the `DefaultAmqpHeaderMapper`:

* `amqp_appId`

* `amqp_clusterId`

* `amqp_contentEncoding`

* `amqp_contentLength`

* `content-type` (see [The `contentType` Header](#amqp-content-type))

* `amqp_correlationId`

* `amqp_delay`

* `amqp_deliveryMode`

* `amqp_deliveryTag`

* `amqp_expiration`

* `amqp_messageCount`

* `amqp_messageId`

* `amqp_receivedDelay`

* `amqp_receivedDeliveryMode`

* `amqp_receivedExchange`

* `amqp_receivedRoutingKey`

* `amqp_redelivered`

* `amqp_replyTo`

* `amqp_timestamp`

* `amqp_type`

* `amqp_userId`

* `amqp_publishConfirm`

* `amqp_publishConfirmNackCause`

* `amqp_returnReplyCode`

* `amqp_returnReplyText`

* `amqp_returnExchange`

* `amqp_returnRoutingKey`

* `amqp_channel`

* `amqp_consumerTag`

* `amqp_consumerQueue`

|   |As mentioned earlier in this section, using a header mapping pattern of `*` is a common way to copy all headers.<br/>However, this can have some unexpected side effects, because certain RabbitMQ proprietary properties/headers are also copied.<br/>For example, when you use [federation](https://www.rabbitmq.com/federated-exchanges.html), the received message may have a property named `x-received-from`, which contains the node that sent the message.<br/>If you use the wildcard character `*` for the request and reply header mapping on the inbound gateway, this header is copied, which may cause some issues with federation.<br/>This reply message may be federated back to the sending broker, which may think that a message is looping and, as a result, silently drop it.<br/>If you wish to use the convenience of wildcard header mapping, you may need to filter out some headers in the downstream flow.<br/>For example, to avoid copying the `x-received-from` header back to the reply you can use `<int:header-filter …​ header-names="x-received-from">` before sending the reply to the AMQP inbound gateway.<br/>Alternatively, you can explicitly list those properties that you actually want mapped, instead of using wildcards.<br/>For these reasons, for inbound messages, the mapper (by default) does not map any `x-*` headers.<br/>It also does not map the `deliveryMode` to the `amqp_deliveryMode` header, to avoid propagation of that header from an inbound message to an outbound message.<br/>Instead, this header is mapped to `amqp_receivedDeliveryMode`, which is not mapped on output.|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

Starting with version 4.3, patterns in the header mappings can be negated by preceding the pattern with `!`.
Negated patterns get priority, so a list such as `STANDARD_REQUEST_HEADERS,thing1,ba*,!thing2,!thing3,qux,!thing1` does not map `thing1` (nor `thing2` nor `thing3`).
The standard headers plus `bad` and `qux` are mapped.
The negation technique can be useful for example to not map JSON type headers for incoming messages when a JSON deserialization logic is done in the receiver downstream different way.
For this purpose a `!json_*` pattern should be configured for header mapper of the inbound channel adapter/gateway.

|   |If you have a user-defined header that begins with `!` that you do wish to map, you need to escape it with `\`, as follows: `STANDARD_REQUEST_HEADERS,\!myBangHeader`.<br/>The header named `!myBangHeader` is now mapped.|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

|   |Starting with version 5.1, the `DefaultAmqpHeaderMapper` will fall back to mapping `MessageHeaders.ID` and `MessageHeaders.TIMESTAMP` to `MessageProperties.messageId` and `MessageProperties.timestamp` respectively, if the corresponding `amqp_messageId` or `amqp_timestamp` headers are not present on outbound messages.<br/>Inbound properties will be mapped to the `amqp_*` headers as before.<br/>It is useful to populate the `messageId` property when message consumers are using stateful retry.|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

#### The `contentType` Header

Unlike other headers, the `AmqpHeaders.CONTENT_TYPE` is not prefixed with `amqp_`; this allows transparent passing of the contentType header across different technologies.
For example an inbound HTTP message sent to a RabbitMQ queue.

The `contentType` header is mapped to Spring AMQP’s `MessageProperties.contentType` property and that is subsequently mapped to RabbitMQ’s `content_type` property.

Prior to version 5.1, this header was also mapped as an entry in the `MessageProperties.headers` map; this was incorrect and, furthermore, the value could be wrong since the underlying Spring AMQP message converter might have changed the content type.
Such a change would be reflected in the first-class `content_type` property, but not in the RabbitMQ headers map.
Inbound mapping ignored the headers map value.`contentType` is no longer mapped to an entry in the headers map.

### Strict Message Ordering

This section describes message ordering for inbound and outbound messages.

#### Inbound

If you require strict ordering of inbound messages, you must configure the inbound listener container’s `prefetchCount` property to `1`.
This is because, if a message fails and is redelivered, it arrives after existing prefetched messages.
Since Spring AMQP version 2.0, the `prefetchCount` defaults to `250` for improved performance.
Strict ordering requirements come at the cost of decreased performance.

#### Outbound

Consider the following integration flow:

```
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlows.from(Gateway.class)
            .split(s -> s.delimiters(","))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}
```

Suppose we send messages `A`, `B` and `C` to the gateway.
While it is likely that messages `A`, `B`, `C` are sent in order, there is no guarantee.
This is because the template “borrows” a channel from the cache for each send operation, and there is no guarantee that the same channel is used for each message.
One solution is to start a transaction before the splitter, but transactions are expensive in RabbitMQ and can reduce performance several hundred fold.

To solve this problem in a more efficient manner, starting with version 5.1, Spring Integration provides the `BoundRabbitChannelAdvice` which is a `HandleMessageAdvice`.
See [Handling Message Advice](./handler-advice.html#handle-message-advice).
When applied before the splitter, it ensures that all downstream operations are performed on the same channel and, optionally, can wait until publisher confirmations for all sent messages are received (if the connection factory is configured for confirmations).
The following example shows how to use `BoundRabbitChannelAdvice`:

```
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlows.from(Gateway.class)
            .split(s -> s.delimiters(",")
                    .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}
```

Notice that the same `RabbitTemplate` (which implements `RabbitOperations`) is used in the advice and the outbound adapter.
The advice runs the downstream flow within the template’s `invoke` method so that all operations run on the same channel.
If the optional timeout is provided, when the flow completes, the advice calls the `waitForConfirmsOrDie` method, which throws an exception if the confirmations are not received within the specified time.

|   |There must be no thread handoffs in the downstream flow (`QueueChannel`, `ExecutorChannel`, and others).|
|---|--------------------------------------------------------------------------------------------------------|

### AMQP Samples

To experiment with the AMQP adapters, check out the samples available in the Spring Integration samples git repository at [https://github.com/SpringSource/spring-integration-samples](https://github.com/spring-projects/spring-integration-samples)

Currently, one sample demonstrates the basic functionality of the Spring Integration AMQP adapter by using an outbound channel adapter and an inbound channel adapter.
As AMQP broker implementation in the sample uses [RabbitMQ](https://www.rabbitmq.com/).

|   |In order to run the example, you need a running instance of RabbitMQ.<br/>A local installation with just the basic defaults suffices.<br/>For detailed RabbitMQ installation procedures, see [https://www.rabbitmq.com/install.html](https://www.rabbitmq.com/install.html)|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

Once the sample application is started, enter some text on the command prompt and a message containing that entered text is dispatched to the AMQP queue.
In return, that message is retrieved by Spring Integration and printed to the console.

The following image illustrates the basic set of Spring Integration components used in this sample.

![spring integration amqp sample graph](https://docs.spring.io/spring-integration/docs/current/reference/html/images/spring-integration-amqp-sample-graph.png)

Figure 1. The Spring Integration graph of the AMQP sample