105.78529341.js 119.6 KB
Newer Older
茶陵後's avatar
茶陵後 已提交
1
(window.webpackJsonp=window.webpackJsonp||[]).push([[105],{530:function(e,t,n){"use strict";n.r(t);var a=n(56),r=Object(a.a)({},(function(){var e=this,t=e.$createElement,n=e._self._c||t;return n("ContentSlotsDistributor",{attrs:{"slot-key":e.$parent.slotKey}},[n("h1",{attrs:{id:"amqp-support"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#amqp-support"}},[e._v("#")]),e._v(" AMQP Support")]),e._v(" "),n("h2",{attrs:{id:"amqp-support-2"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#amqp-support-2"}},[e._v("#")]),e._v(" AMQP Support")]),e._v(" "),n("p",[e._v("Spring Integration provides channel adapters for receiving and sending messages by using the Advanced Message Queuing Protocol (AMQP).")]),e._v(" "),n("p",[e._v("You need to include this dependency into your project:")]),e._v(" "),n("p",[e._v("Maven")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v("<dependency>\n    <groupId>org.springframework.integration</groupId>\n    <artifactId>spring-integration-amqp</artifactId>\n    <version>5.5.9</version>\n</dependency>\n")])])]),n("p",[e._v("Gradle")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('compile "org.springframework.integration:spring-integration-amqp:5.5.9"\n')])])]),n("p",[e._v("The following adapters are available:")]),e._v(" "),n("ul",[n("li",[n("p",[n("a",{attrs:{href:"#amqp-inbound-channel-adapter"}},[e._v("Inbound Channel Adapter")])])]),e._v(" "),n("li",[n("p",[n("a",{attrs:{href:"#amqp-inbound-gateway"}},[e._v("Inbound Gateway")])])]),e._v(" "),n("li",[n("p",[n("a",{attrs:{href:"#amqp-outbound-channel-adapter"}},[e._v("Outbound Channel Adapter")])])]),e._v(" "),n("li",[n("p",[n("a",{attrs:{href:"#amqp-outbound-gateway"}},[e._v("Outbound Gateway")])])]),e._v(" "),n("li",[n("p",[n("a",{attrs:{href:"#amqp-async-outbound-gateway"}},[e._v("Async Outbound Gateway")])])])]),e._v(" "),n("p",[e._v("Spring Integration also provides a point-to-point message channel and a publish-subscribe message channel backed by AMQP Exchanges and Queues.")]),e._v(" "),n("p",[e._v("To provide AMQP support, Spring Integration relies on ("),n("a",{attrs:{href:"https://projects.spring.io/spring-amqp",target:"_blank",rel:"noopener noreferrer"}},[e._v("Spring AMQP"),n("OutboundLink")],1),e._v("), which applies core Spring concepts to the development of AMQP-based messaging solutions.\nSpring AMQP provides similar semantics to ("),n("a",{attrs:{href:"https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#jms",target:"_blank",rel:"noopener noreferrer"}},[e._v("Spring JMS"),n("OutboundLink")],1),e._v(").")]),e._v(" "),n("p",[e._v("Whereas the provided AMQP Channel Adapters are intended for unidirectional messaging (send or receive) only, Spring Integration also provides inbound and outbound AMQP gateways for request-reply operations.")]),e._v(" "),n("p",[e._v("TIP:\nYou should familiarize yourself with the "),n("a",{attrs:{href:"https://docs.spring.io/spring-amqp/reference/html/",target:"_blank",rel:"noopener noreferrer"}},[e._v("reference documentation of the Spring AMQP project"),n("OutboundLink")],1),e._v(".\nIt provides much more in-depth information about Spring’s integration with AMQP in general and RabbitMQ in particular.")]),e._v(" "),n("h3",{attrs:{id:"inbound-channel-adapter"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#inbound-channel-adapter"}},[e._v("#")]),e._v(" Inbound Channel Adapter")]),e._v(" "),n("p",[e._v("The following listing shows the possible configuration options for an AMQP Inbound Channel Adapter:")]),e._v(" "),n("p",[e._v("Java DSL")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\npublic IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {\n    return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "aName"))\n            .handle(m -> System.out.println(m.getPayload()))\n            .get();\n}\n')])])]),n("p",[e._v("Java")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\npublic MessageChannel amqpInputChannel() {\n    return new DirectChannel();\n}\n\n@Bean\npublic AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,\n        @Qualifier("amqpInputChannel") MessageChannel channel) {\n    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);\n    adapter.setOutputChannel(channel);\n    return adapter;\n}\n\n@Bean\npublic SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {\n    SimpleMessageListenerContainer container =\n                               new SimpleMessageListenerContainer(connectionFactory);\n    container.setQueueNames("aName");\n    container.setConcurrentConsumers(2);\n    // ...\n    return container;\n}\n\n@Bean\n@ServiceActivator(inputChannel = "amqpInputChannel")\npublic MessageHandler handler() {\n    return new MessageHandler() {\n\n        @Override\n        public void handleMessage(Message<?> message) throws MessagingException {\n            System.out.println(message.getPayload());\n        }\n\n    };\n}\n')])])]),n("p",[e._v("XML")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('<int-amqp:inbound-channel-adapter\n                                  id="inboundAmqp"                (1)\n                                  channel="inboundChannel"        (2)\n                                  queue-names="si.test.queue"     (3)\n                                  acknowledge-mode="AUTO"         (4)\n                                  advice-chain=""                 (5)\n                                  channel-transacted=""           (6)\n                                  concurrent-consumers=""         (7)\n                                  connection-factory=""           (8)\n                                  error-channel=""                (9)\n                                  expose-listener-channel=""      (10)\n                                  header-mapper=""                (11)\n                                  mapped-request-headers=""       (12)\n                                  listener-container=""           (13)\n                                  message-converter=""            (14)\n                                  message-properties-converter="" (15)\n                                  phase=""                        (16)\n                                  prefetch-count=""               (17)\n                                  receive-timeout=""              (18)\n                                  recovery-interval=""            (19)\n                                  missing-queues-fatal=""         (20)\n                                  shutdown-timeout=""             (21)\n                                  task-executor=""                (22)\n                                  transaction-attribute=""        (23)\n                                  transaction-manager=""          (24)\n                                  tx-size=""                      (25)\n                                  consumers-per-queue             (26)\n                                  batch-mode="MESSAGES"/>         (27)\n')])])]),n("table",[n("thead",[n("tr",[n("th",[n("strong",[e._v("1")])]),e._v(" "),n("th",[e._v("The unique ID for this adapter."),n("br"),e._v("Optional.")])])]),e._v(" "),n("tbody",[n("tr",[n("td",[n("strong",[e._v("2")])]),e._v(" "),n("td",[e._v("Message channel to which converted messages should be sent."),n("br"),e._v("Required.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("3")])]),e._v(" "),n("td",[e._v("Names of the AMQP queues (comma-separated list) from which messages should be consumed."),n("br"),e._v("Required.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("4")])]),e._v(" "),n("td",[e._v("Acknowledge mode for the "),n("code",[e._v("MessageListenerContainer")]),e._v("."),n("br"),e._v("When set to "),n("code",[e._v("MANUAL")]),e._v(", the delivery tag and channel are provided in message headers "),n("code",[e._v("amqp_deliveryTag")]),e._v(" and "),n("code",[e._v("amqp_channel")]),e._v(", respectively."),n("br"),e._v("The user application is responsible for acknowledgement."),n("code",[e._v("NONE")]),e._v(" means no acknowledgements ("),n("code",[e._v("autoAck")]),e._v(")."),n("code",[e._v("AUTO")]),e._v(" means the adapter’s container acknowledges when the downstream flow completes."),n("br"),e._v("Optional (defaults to AUTO)."),n("br"),e._v("See "),n("a",{attrs:{href:"#amqp-inbound-ack"}},[e._v("Inbound Endpoint Acknowledge Mode")]),e._v(".")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("5")])]),e._v(" "),n("td",[e._v("Extra AOP Advices to handle cross-cutting behavior associated with this inbound channel adapter."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("6")])]),e._v(" "),n("td",[e._v("Flag to indicate that channels created by this component are transactional."),n("br"),e._v("If true, it tells the framework to use a transactional channel and to end all operations (send or receive) with a commit or rollback, depending on the outcome, with an exception that signals a rollback."),n("br"),e._v("Optional (Defaults to false).")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("7")])]),e._v(" "),n("td",[e._v("Specify the number of concurrent consumers to create."),n("br"),e._v("The default is "),n("code",[e._v("1")]),e._v("."),n("br"),e._v("We recommend raising the number of concurrent consumers to scale the consumption of messages coming in from a queue."),n("br"),e._v("However, note that any ordering guarantees are lost once multiple consumers are registered."),n("br"),e._v("In general, use one consumer for low-volume queues."),n("br"),e._v("Not allowed when 'consumers-per-queue' is set."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("8")])]),e._v(" "),n("td",[e._v("Bean reference to the RabbitMQ "),n("code",[e._v("ConnectionFactory")]),e._v("."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("connectionFactory")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("9")])]),e._v(" "),n("td",[e._v("Message channel to which error messages should be sent."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("10")])]),e._v(" "),n("td",[e._v("Whether the listener channel (com.rabbitmq.client.Channel) is exposed to a registered "),n("code",[e._v("ChannelAwareMessageListener")]),e._v("."),n("br"),e._v("Optional (defaults to true).")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("11")])]),e._v(" "),n("td",[e._v("A reference to an "),n("code",[e._v("AmqpHeaderMapper")]),e._v(" to use when receiving AMQP Messages."),n("br"),e._v("Optional."),n("br"),e._v("By default, only standard AMQP properties (such as "),n("code",[e._v("contentType")]),e._v(") are copied to Spring Integration "),n("code",[e._v("MessageHeaders")]),e._v("."),n("br"),e._v("Any user-defined headers within the AMQP "),n("code",[e._v("MessageProperties")]),e._v(" are NOT copied to the message by the default "),n("code",[e._v("DefaultAmqpHeaderMapper")]),e._v("."),n("br"),e._v("Not allowed if 'request-header-names' is provided.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("12")])]),e._v(" "),n("td",[e._v("Comma-separated list of the names of AMQP Headers to be mapped from the AMQP request into the "),n("code",[e._v("MessageHeaders")]),e._v("."),n("br"),e._v("This can only be provided if the 'header-mapper' reference is not provided."),n("br"),e._v('The values in this list can also be simple patterns to be matched against the header names (such as "*" or "thing1*, thing2" or "*something").')])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("13")])]),e._v(" "),n("td",[e._v("Reference to the "),n("code",[e._v("AbstractMessageListenerContainer")]),e._v(" to use for receiving AMQP Messages."),n("br"),e._v("If this attribute is provided, no other attribute related to the listener container configuration should be provided."),n("br"),e._v("In other words, by setting this reference, you must take full responsibility for the listener container configuration."),n("br"),e._v("The only exception is the "),n("code",[e._v("MessageListener")]),e._v(" itself."),n("br"),e._v("Since that is actually the core responsibility of this channel adapter implementation, the referenced listener container must not already have its own "),n("code",[e._v("MessageListener")]),e._v("."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("14")])]),e._v(" "),n("td",[e._v("The "),n("code",[e._v("MessageConverter")]),e._v(" to use when receiving AMQP messages."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("15")])]),e._v(" "),n("td",[e._v("The "),n("code",[e._v("MessagePropertiesConverter")]),e._v(" to use when receiving AMQP messages."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("16")])]),e._v(" "),n("td",[e._v("Specifies the phase in which the underlying "),n("code",[e._v("AbstractMessageListenerContainer")]),e._v(" should be started and stopped."),n("br"),e._v("The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that."),n("br"),e._v("By default, this value is "),n("code",[e._v("Integer.MAX_VALUE")]),e._v(", meaning that this container starts as late as possible and stops as soon as possible."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("17")])]),e._v(" "),n("td",[e._v("Tells the AMQP broker how many messages to send to each consumer in a single request."),n("br"),e._v("Often, you can set this value high to improve throughput."),n("br"),e._v("It should be greater than or equal to the transaction size (see the "),n("code",[e._v("tx-size")]),e._v(" attribute, later in this list)."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("1")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("18")])]),e._v(" "),n("td",[e._v("Receive timeout in milliseconds."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("1000")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("19")])]),e._v(" "),n("td",[e._v("Specifies the interval between recovery attempts of the underlying "),n("code",[e._v("AbstractMessageListenerContainer")]),e._v(" (in milliseconds)."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("5000")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("20")])]),e._v(" "),n("td",[e._v("If 'true' and none of the queues are available on the broker, the container throws a fatal exception during startup and stops if the queues are deleted when the container is running (after making three attempts to passively declare the queues)."),n("br"),e._v("If "),n("code",[e._v("false")]),e._v(", the container does not throw an exception and goes into recovery mode, attempting to restart according to the "),n("code",[e._v("recovery-interval")]),e._v("."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("true")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("21")])]),e._v(" "),n("td",[e._v("The time to wait for workers (in milliseconds) after the underlying "),n("code",[e._v("AbstractMessageListenerContainer")]),e._v(" is stopped and before the AMQP connection is forced closed."),n("br"),e._v("If any workers are active when the shutdown signal comes, they are allowed to finish processing as long as they can finish within this timeout."),n("br"),e._v("Otherwise, the connection is closed and messages remain unacknowledged (if the channel is transactional)."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("5000")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("22")])]),e._v(" "),n("td",[e._v("By default, the underlying "),n("code",[e._v("AbstractMessageListenerContainer")]),e._v(" uses a "),n("code",[e._v("SimpleAsyncTaskExecutor")]),e._v(" implementation, that fires up a new thread for each task, running it asynchronously."),n("br"),e._v("By default, the number of concurrent threads is unlimited."),n("br"),e._v("Note that this implementation does not reuse threads."),n("br"),e._v("Consider using a thread-pooling "),n("code",[e._v("TaskExecutor")]),e._v(" implementation as an alternative."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("SimpleAsyncTaskExecutor")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("23")])]),e._v(" "),n("td",[e._v("By default, the underlying "),n("code",[e._v("AbstractMessageListenerContainer")]),e._v(" creates a new instance of the "),n("code",[e._v("DefaultTransactionAttribute")]),e._v(" (it takes the EJB approach to rolling back on runtime but not checked exceptions)."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("DefaultTransactionAttribute")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("24")])]),e._v(" "),n("td",[e._v("Sets a bean reference to an external "),n("code",[e._v("PlatformTransactionManager")]),e._v(" on the underlying "),n("code",[e._v("AbstractMessageListenerContainer")]),e._v("."),n("br"),e._v("The transaction manager works in conjunction with the "),n("code",[e._v("channel-transacted")]),e._v(" attribute."),n("br"),e._v("If there is already a transaction in progress when the framework is sending or receiving a message and the "),n("code",[e._v("channelTransacted")]),e._v(" flag is "),n("code",[e._v("true")]),e._v(", the commit or rollback of the messaging transaction is deferred until the end of the current transaction."),n("br"),e._v("If the "),n("code",[e._v("channelTransacted")]),e._v(" flag is "),n("code",[e._v("false")]),e._v(", no transaction semantics apply to the messaging operation (it is auto-acked)."),n("br"),e._v("For further information, see"),n("a",{attrs:{href:"https://docs.spring.io/spring-amqp/reference/html/%255Freference.html#%5Ftransactions",target:"_blank",rel:"noopener noreferrer"}},[e._v("Transactions with Spring AMQP"),n("OutboundLink")],1),e._v("."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("25")])]),e._v(" "),n("td",[e._v("Tells the "),n("code",[e._v("SimpleMessageListenerContainer")]),e._v(" how many messages to process in a single transaction (if the channel is transactional)."),n("br"),e._v("For best results, it should be less than or equal to the value set in "),n("code",[e._v("prefetch-count")]),e._v("."),n("br"),e._v("Not allowed when 'consumers-per-queue' is set."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("1")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("26")])]),e._v(" "),n("td",[e._v("Indicates that the underlying listener container should be a "),n("code",[e._v("DirectMessageListenerContainer")]),e._v(" instead of the default "),n("code",[e._v("SimpleMessageListenerContainer")]),e._v("."),n("br"),e._v("See the "),n("a",{attrs:{href:"https://docs.spring.io/spring-amqp/reference/html/",target:"_blank",rel:"noopener noreferrer"}},[e._v("Spring AMQP Reference Manual"),n("OutboundLink")],1),e._v(" for more information.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("27")])]),e._v(" "),n("td",[e._v("When the container’s "),n("code",[e._v("consumerBatchEnabled")]),e._v(" is "),n("code",[e._v("true")]),e._v(", determines how the adapter presents the batch of messages in the message payload."),n("br"),e._v("When set to "),n("code",[e._v("MESSAGES")]),e._v(" (default), the payload is a "),n("code",[e._v("List<Message<?>>")]),e._v(" where each message has headers mapped from the incoming AMQP "),n("code",[e._v("Message")]),e._v(" and the payload is the converted "),n("code",[e._v("body")]),e._v("."),n("br"),e._v("When set to "),n("code",[e._v("EXTRACT_PAYLOADS")]),e._v(", the payload is a "),n("code",[e._v("List<?>")]),e._v(" where the elements are converted from the AMQP "),n("code",[e._v("Message")]),e._v(" body."),n("code",[e._v("EXTRACT_PAYLOADS_WITH_HEADERS")]),e._v(" is similar to "),n("code",[e._v("EXTRACT_PAYLOADS")]),e._v(" but, in addition, the headers from each message are mapped from the "),n("code",[e._v("MessageProperties")]),e._v(" into a "),n("code",[e._v("List<Map<String, Object>")]),e._v(" at the corresponding index; the header name is "),n("code",[e._v("AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS")]),e._v(".")])])])]),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("container"),n("br"),n("br"),e._v("Note that when configuring an external container with XML, you cannot use the Spring AMQP namespace to define the container."),n("br"),e._v("This is because the namespace requires at least one "),n("code",[e._v("<listener/>")]),e._v(" element."),n("br"),e._v("In this environment, the listener is internal to the adapter."),n("br"),e._v("For this reason, you must define the container by using a normal Spring "),n("code",[e._v("<bean/>")]),e._v(" definition, as the following example shows:"),n("br"),n("br"),n("code",[e._v('<br/><bean id="container"<br/> class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"><br/> <property name="connectionFactory" ref="connectionFactory" /><br/> <property name="queueNames" value="aName.queue" /><br/> <property name="defaultRequeueRejected" value="false"/><br/></bean><br/>')])])])]),e._v(" "),n("tbody")]),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("Even though the Spring Integration JMS and AMQP support is similar, important differences exist."),n("br"),e._v("The JMS inbound channel adapter is using a "),n("code",[e._v("JmsDestinationPollingSource")]),e._v(" under the covers and expects a configured poller."),n("br"),e._v("The AMQP inbound channel adapter uses an "),n("code",[e._v("AbstractMessageListenerContainer")]),e._v(" and is message driven."),n("br"),e._v("In that regard, it is more similar to the JMS message-driven channel adapter.")])])]),e._v(" "),n("tbody")]),e._v(" "),n("p",[e._v("Starting with version 5.5, the "),n("code",[e._v("AmqpInboundChannelAdapter")]),e._v(" can be configured with an "),n("code",[e._v("org.springframework.amqp.rabbit.retry.MessageRecoverer")]),e._v(" strategy which is used in the "),n("code",[e._v("RecoveryCallback")]),e._v(" when the retry operation is called internally.\nSee "),n("code",[e._v("setMessageRecoverer()")]),e._v(" JavaDocs for more information.")]),e._v(" "),n("h4",{attrs:{id:"batched-messages"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#batched-messages"}},[e._v("#")]),e._v(" Batched Messages")]),e._v(" "),n("p",[e._v("See "),n("a",{attrs:{href:"https://docs.spring.io/spring-amqp/docs/current/reference/html/#template-batching",target:"_blank",rel:"noopener noreferrer"}},[e._v("the Spring AMQP Documentation"),n("OutboundLink")],1),e._v(" for more information about batched messages.")]),e._v(" "),n("p",[e._v("To produce batched messages with Spring Integration, simply configure the outbound endpoint with a "),n("code",[e._v("BatchingRabbitTemplate")]),e._v(".")]),e._v(" "),n("p",[e._v("When receiving batched messages, by default, the listener containers extract each fragment message and the adapter will produce a "),n("code",[e._v("Message<?>")]),e._v(" for each fragment.\nStarting with version 5.2, if the container’s "),n("code",[e._v("deBatchingEnabled")]),e._v(" property is set to "),n("code",[e._v("false")]),e._v(", the de-batching is performed by the adapter instead, and a single "),n("code",[e._v("Message<List<?>>")]),e._v(" is produced with the payload being a list of the fragment payloads (after conversion if appropriate).")]),e._v(" "),n("p",[e._v("The default "),n("code",[e._v("BatchingStrategy")]),e._v(" is the "),n("code",[e._v("SimpleBatchingStrategy")]),e._v(", but this can be overridden on the adapter.")]),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("The "),n("code",[e._v("org.springframework.amqp.rabbit.retry.MessageBatchRecoverer")]),e._v(" must be used with batches when recovery is required for retry operations.")])])]),e._v(" "),n("tbody")]),e._v(" "),n("h3",{attrs:{id:"polled-inbound-channel-adapter"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#polled-inbound-channel-adapter"}},[e._v("#")]),e._v(" Polled Inbound Channel Adapter")]),e._v(" "),n("h4",{attrs:{id:"overview"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#overview"}},[e._v("#")]),e._v(" Overview")]),e._v(" "),n("p",[e._v("Version 5.0.1 introduced a polled channel adapter, letting you fetch individual messages on demand — for example, with a "),n("code",[e._v("MessageSourcePollingTemplate")]),e._v(" or a poller.\nSee "),n("RouterLink",{attrs:{to:"/en/spring-integration/polling-consumer.html#deferred-acks-message-source"}},[e._v("Deferred Acknowledgment Pollable Message Source")]),e._v(" for more information.")],1),e._v(" "),n("p",[e._v("It does not currently support XML configuration.")]),e._v(" "),n("p",[e._v("The following example shows how to configure an "),n("code",[e._v("AmqpMessageSource")]),e._v(":")]),e._v(" "),n("p",[e._v("Java DSL")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v("@Bean\npublic IntegrationFlow flow() {\n    return IntegrationFlows.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE),\n                    e -> e.poller(Pollers.fixedDelay(1_000)).autoStartup(false))\n            .handle(p -> {\n                ...\n            })\n            .get();\n}\n")])])]),n("p",[e._v("Java")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\npublic AmqpMessageSource source(ConnectionFactory connectionFactory) {\n    return new AmqpMessageSource(connectionFactory, "someQueue");\n}\n')])])]),n("p",[e._v("See the "),n("a",{attrs:{href:"https://docs.spring.io/spring-integration/api/org/springframework/integration/amqp/inbound/AmqpMessageSource.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("Javadoc"),n("OutboundLink")],1),e._v(" for configuration properties.")]),e._v(" "),n("p",[e._v("XML")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v("This adapter currently does not have XML configuration support.\n")])])]),n("h4",{attrs:{id:"batched-messages-2"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#batched-messages-2"}},[e._v("#")]),e._v(" Batched Messages")]),e._v(" "),n("p",[e._v("See "),n("a",{attrs:{href:"#amqp-debatching"}},[e._v("Batched Messages")]),e._v(".")]),e._v(" "),n("p",[e._v("For the polled adapter, there is no listener container, batched messages are always debatched (if the "),n("code",[e._v("BatchingStrategy")]),e._v(" supports doing so).")]),e._v(" "),n("h3",{attrs:{id:"inbound-gateway"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#inbound-gateway"}},[e._v("#")]),e._v(" Inbound Gateway")]),e._v(" "),n("p",[e._v("The inbound gateway supports all the attributes on the inbound channel adapter (except that 'channel' is replaced by 'request-channel'), plus some additional attributes.\nThe following listing shows the available attributes:")]),e._v(" "),n("p",[e._v("Java DSL")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean // return the upper cased payload\npublic IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) {\n    return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, "foo"))\n            .transform(String.class, String::toUpperCase)\n            .get();\n}\n')])])]),n("p",[e._v("Java")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\npublic MessageChannel amqpInputChannel() {\n    return new DirectChannel();\n}\n\n@Bean\npublic AmqpInboundGateway inbound(SimpleMessageListenerContainer listenerContainer,\n        @Qualifier("amqpInputChannel") MessageChannel channel) {\n    AmqpInboundGateway gateway = new AmqpInboundGateway(listenerContainer);\n    gateway.setRequestChannel(channel);\n    gateway.setDefaultReplyTo("bar");\n    return gateway;\n}\n\n@Bean\npublic SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {\n    SimpleMessageListenerContainer container =\n                    new SimpleMessageListenerContainer(connectionFactory);\n    container.setQueueNames("foo");\n    container.setConcurrentConsumers(2);\n    // ...\n    return container;\n}\n\n@Bean\n@ServiceActivator(inputChannel = "amqpInputChannel")\npublic MessageHandler handler() {\n    return new AbstractReplyProducingMessageHandler() {\n\n        @Override\n        protected Object handleRequestMessage(Message<?> requestMessage) {\n            return "reply to " + requestMessage.getPayload();\n        }\n\n    };\n}\n')])])]),n("p",[e._v("XML")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('<int-amqp:inbound-gateway\n                          id="inboundGateway"                (1)\n                          request-channel="myRequestChannel" (2)\n                          header-mapper=""                   (3)\n                          mapped-request-headers=""          (4)\n                          mapped-reply-headers=""            (5)\n                          reply-channel="myReplyChannel"     (6)\n                          reply-timeout="1000"               (7)\n                          amqp-template=""                   (8)\n                          default-reply-to="" />             (9)\n')])])]),n("table",[n("thead",[n("tr",[n("th",[n("strong",[e._v("1")])]),e._v(" "),n("th",[e._v("The Unique ID for this adapter."),n("br"),e._v("Optional.")])])]),e._v(" "),n("tbody",[n("tr",[n("td",[n("strong",[e._v("2")])]),e._v(" "),n("td",[e._v("Message channel to which converted messages are sent."),n("br"),e._v("Required.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("3")])]),e._v(" "),n("td",[e._v("A reference to an "),n("code",[e._v("AmqpHeaderMapper")]),e._v(" to use when receiving AMQP Messages."),n("br"),e._v("Optional."),n("br"),e._v("By default, only standard AMQP properties (such as "),n("code",[e._v("contentType")]),e._v(") are copied to and from Spring Integration "),n("code",[e._v("MessageHeaders")]),e._v("."),n("br"),e._v("Any user-defined headers within the AMQP "),n("code",[e._v("MessageProperties")]),e._v(" are not copied to or from an AMQP message by the default "),n("code",[e._v("DefaultAmqpHeaderMapper")]),e._v("."),n("br"),e._v("Not allowed if 'request-header-names' or 'reply-header-names' is provided.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("4")])]),e._v(" "),n("td",[e._v("Comma-separated list of names of AMQP Headers to be mapped from the AMQP request into the "),n("code",[e._v("MessageHeaders")]),e._v("."),n("br"),e._v("This attribute can be provided only if the 'header-mapper' reference is not provided."),n("br"),e._v("The values in this list can also be simple patterns to be matched against the header names (e.g. "),n("code",[e._v('"*"')]),e._v(" or "),n("code",[e._v('"thing1*, thing2"')]),e._v(" or "),n("code",[e._v('"*thing1"')]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("5")])]),e._v(" "),n("td",[e._v("Comma-separated list of names of "),n("code",[e._v("MessageHeaders")]),e._v(" to be mapped into the AMQP message properties of the AMQP reply message."),n("br"),e._v("All standard Headers (such as "),n("code",[e._v("contentType")]),e._v(") are mapped to AMQP Message Properties, while user-defined headers are mapped to the 'headers' property."),n("br"),e._v("This attribute can only be provided if the 'header-mapper' reference is not provided."),n("br"),e._v("The values in this list can also be simple patterns to be matched against the header names (for example, "),n("code",[e._v('"*"')]),e._v(" or "),n("code",[e._v('"foo*, bar"')]),e._v(" or "),n("code",[e._v('"*foo"')]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("6")])]),e._v(" "),n("td",[e._v("Message Channel where reply Messages are expected."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("7")])]),e._v(" "),n("td",[e._v("Sets the "),n("code",[e._v("receiveTimeout")]),e._v(" on the underlying "),n("code",[e._v("o.s.i.core.MessagingTemplate")]),e._v(" for receiving messages from the reply channel."),n("br"),e._v("If not specified, this property defaults to "),n("code",[e._v("1000")]),e._v(" (1 second)."),n("br"),e._v("Only applies if the container thread hands off to another thread before the reply is sent.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("8")])]),e._v(" "),n("td",[e._v("The customized "),n("code",[e._v("AmqpTemplate")]),e._v(" bean reference (to have more control over the reply messages to send)."),n("br"),e._v("You can provide an alternative implementation to the "),n("code",[e._v("RabbitTemplate")]),e._v(".")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("9")])]),e._v(" "),n("td",[e._v("The "),n("code",[e._v("replyTo")]),e._v(" "),n("code",[e._v("o.s.amqp.core.Address")]),e._v(" to be used when the "),n("code",[e._v("requestMessage")]),e._v(" does not have a "),n("code",[e._v("replyTo")]),e._v("property."),n("br"),e._v("If this option is not specified, no "),n("code",[e._v("amqp-template")]),e._v(" is provided, no "),n("code",[e._v("replyTo")]),e._v(" property exists in the request message, and"),n("br"),e._v("an "),n("code",[e._v("IllegalStateException")]),e._v(" is thrown because the reply cannot be routed."),n("br"),e._v("If this option is not specified and an external "),n("code",[e._v("amqp-template")]),e._v(" is provided, no exception is thrown."),n("br"),e._v("You must either specify this option or configure a default "),n("code",[e._v("exchange")]),e._v(" and "),n("code",[e._v("routingKey")]),e._v(" on that template,"),n("br"),e._v("if you anticipate cases when no "),n("code",[e._v("replyTo")]),e._v(" property exists in the request message.")])])])]),e._v(" "),n("p",[e._v("See the note in "),n("a",{attrs:{href:"#amqp-inbound-channel-adapter"}},[e._v("Inbound Channel Adapter")]),e._v(" about configuring the "),n("code",[e._v("listener-container")]),e._v(" attribute.")]),e._v(" "),n("p",[e._v("Starting with version 5.5, the "),n("code",[e._v("AmqpInboundChannelAdapter")]),e._v(" can be configured with an "),n("code",[e._v("org.springframework.amqp.rabbit.retry.MessageRecoverer")]),e._v(" strategy which is used in the "),n("code",[e._v("RecoveryCallback")]),e._v(" when the retry operation is called internally.\nSee "),n("code",[e._v("setMessageRecoverer()")]),e._v(" JavaDocs for more information.")]),e._v(" "),n("h4",{attrs:{id:"batched-messages-3"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#batched-messages-3"}},[e._v("#")]),e._v(" Batched Messages")]),e._v(" "),n("p",[e._v("See "),n("a",{attrs:{href:"#amqp-debatching"}},[e._v("Batched Messages")]),e._v(".")]),e._v(" "),n("h3",{attrs:{id:"inbound-endpoint-acknowledge-mode"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#inbound-endpoint-acknowledge-mode"}},[e._v("#")]),e._v(" Inbound Endpoint Acknowledge Mode")]),e._v(" "),n("p",[e._v("By default, the inbound endpoints use the "),n("code",[e._v("AUTO")]),e._v(" acknowledge mode, which means the container automatically acknowledges the message when the downstream integration flow completes (or a message is handed off to another thread by using a "),n("code",[e._v("QueueChannel")]),e._v(" or "),n("code",[e._v("ExecutorChannel")]),e._v(").\nSetting the mode to "),n("code",[e._v("NONE")]),e._v(" configures the consumer such that acknowledgments are not used at all (the broker automatically acknowledges the message as soon as it is sent).\nSetting the mode to "),n("code",[e._v("MANUAL")]),e._v(" lets user code acknowledge the message at some other point during processing.\nTo support this, with this mode, the endpoints provide the "),n("code",[e._v("Channel")]),e._v(" and "),n("code",[e._v("deliveryTag")]),e._v(" in the "),n("code",[e._v("amqp_channel")]),e._v(" and "),n("code",[e._v("amqp_deliveryTag")]),e._v(" headers, respectively.")]),e._v(" "),n("p",[e._v("You can perform any valid Rabbit command on the "),n("code",[e._v("Channel")]),e._v(" but, generally, only "),n("code",[e._v("basicAck")]),e._v(" and "),n("code",[e._v("basicNack")]),e._v(" (or "),n("code",[e._v("basicReject")]),e._v(") are used.\nIn order to not interfere with the operation of the container, you should not retain a reference to the channel and use it only in the context of the current message.")]),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("Since the "),n("code",[e._v("Channel")]),e._v(" is a reference to a “live” object, it cannot be serialized and is lost if a message is persisted.")])])]),e._v(" "),n("tbody")]),e._v(" "),n("p",[e._v("The following example shows how you might use "),n("code",[e._v("MANUAL")]),e._v(" acknowledgement:")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@ServiceActivator(inputChannel = "foo", outputChannel = "bar")\npublic Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel,\n        @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception {\n\n    // Do some processing\n\n    if (allOK) {\n        channel.basicAck(deliveryTag, false);\n\n        // perhaps do some more processing\n\n    }\n    else {\n        channel.basicNack(deliveryTag, false, true);\n    }\n    return someResultForDownStreamProcessing;\n}\n')])])]),n("h3",{attrs:{id:"outbound-endpoints"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#outbound-endpoints"}},[e._v("#")]),e._v(" Outbound Endpoints")]),e._v(" "),n("p",[e._v("The following outbound endpoints have many similar configuration options.\nStarting with version 5.2, the "),n("code",[e._v("confirm-timeout")]),e._v(' has been added.\nNormally, when publisher confirms are enabled, the broker will quickly return an ack (or nack) which will be sent to the appropriate channel.\nIf a channel is closed before the confirm is received, the Spring AMQP framework will synthesize a nack.\n"Missing" acks should never occur but, if you set this property, the endpoint will periodically check for them and synthesize a nack if the time elapses without a confirm being received.')]),e._v(" "),n("h3",{attrs:{id:"outbound-channel-adapter"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#outbound-channel-adapter"}},[e._v("#")]),e._v(" Outbound Channel Adapter")]),e._v(" "),n("p",[e._v("The following example shows the available properties for an AMQP outbound channel adapter:")]),e._v(" "),n("p",[e._v("Java DSL")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v("@Bean\npublic IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate,\n        MessageChannel amqpOutboundChannel) {\n    return IntegrationFlows.from(amqpOutboundChannel)\n            .handle(Amqp.outboundAdapter(amqpTemplate)\n                        .routingKey(\"queue1\")) // default exchange - route to queue 'queue1'\n            .get();\n}\n")])])]),n("p",[e._v("Java")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\n@ServiceActivator(inputChannel = "amqpOutboundChannel")\npublic AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {\n    AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);\n    outbound.setRoutingKey("queue1"); // default exchange - route to queue \'queue1\'\n    return outbound;\n}\n\n@Bean\npublic MessageChannel amqpOutboundChannel() {\n    return new DirectChannel();\n}\n')])])]),n("p",[e._v("XML")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('<int-amqp:outbound-channel-adapter id="outboundAmqp"             (1)\n                               channel="outboundChannel"         (2)\n                               amqp-template="myAmqpTemplate"    (3)\n                               exchange-name=""                  (4)\n                               exchange-name-expression=""       (5)\n                               order="1"                         (6)\n                               routing-key=""                    (7)\n                               routing-key-expression=""         (8)\n                               default-delivery-mode""           (9)\n                               confirm-correlation-expression="" (10)\n                               confirm-ack-channel=""            (11)\n                               confirm-nack-channel=""           (12)\n                               confirm-timeout=""                (13)\n                               wait-for-confirm=""               (14)\n                               return-channel=""                 (15)\n                               error-message-strategy=""         (16)\n                               header-mapper=""                  (17)\n                               mapped-request-headers=""         (18)\n                               lazy-connect="true"               (19)\n                               multi-send="false"/>              (20)\n')])])]),n("table",[n("thead",[n("tr",[n("th",[n("strong",[e._v("1")])]),e._v(" "),n("th",[e._v("The unique ID for this adapter."),n("br"),e._v("Optional.")])])]),e._v(" "),n("tbody",[n("tr",[n("td",[n("strong",[e._v("2")])]),e._v(" "),n("td",[e._v("Message channel to which messages should be sent to have them converted and published to an AMQP exchange."),n("br"),e._v("Required.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("3")])]),e._v(" "),n("td",[e._v("Bean reference to the configured AMQP template."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("amqpTemplate")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("4")])]),e._v(" "),n("td",[e._v("The name of the AMQP exchange to which messages are sent."),n("br"),e._v("If not provided, messages are sent to the default, no-name exchange."),n("br"),e._v("Mutually exclusive with 'exchange-name-expression'."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("5")])]),e._v(" "),n("td",[e._v("A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages are sent, with the message as the root object."),n("br"),e._v("If not provided, messages are sent to the default, no-name exchange."),n("br"),e._v("Mutually exclusive with 'exchange-name'."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("6")])]),e._v(" "),n("td",[e._v("The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("7")])]),e._v(" "),n("td",[e._v("The fixed routing-key to use when sending messages."),n("br"),e._v("By default, this is an empty "),n("code",[e._v("String")]),e._v("."),n("br"),e._v("Mutually exclusive with 'routing-key-expression'."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("8")])]),e._v(" "),n("td",[e._v("A SpEL expression that is evaluated to determine the routing key to use when sending messages, with the message as the root object (for example, 'payload.key')."),n("br"),e._v("By default, this is an empty "),n("code",[e._v("String")]),e._v("."),n("br"),e._v("Mutually exclusive with 'routing-key'."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("9")])]),e._v(" "),n("td",[e._v("The default delivery mode for messages: "),n("code",[e._v("PERSISTENT")]),e._v(" or "),n("code",[e._v("NON_PERSISTENT")]),e._v("."),n("br"),e._v("Overridden if the "),n("code",[e._v("header-mapper")]),e._v(" sets the delivery mode."),n("br"),e._v("If the Spring Integration message header "),n("code",[e._v("amqp_deliveryMode")]),e._v(" is present, the "),n("code",[e._v("DefaultHeaderMapper")]),e._v(" sets the value."),n("br"),e._v("If this attribute is not supplied and the header mapper does not set it, the default depends on the underlying Spring AMQP "),n("code",[e._v("MessagePropertiesConverter")]),e._v(" used by the "),n("code",[e._v("RabbitTemplate")]),e._v("."),n("br"),e._v("If that is not customized at all, the default is "),n("code",[e._v("PERSISTENT")]),e._v("."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("10")])]),e._v(" "),n("td",[e._v("An expression that defines correlation data."),n("br"),e._v("When provided, this configures the underlying AMQP template to receive publisher confirmations."),n("br"),e._v("Requires a dedicated "),n("code",[e._v("RabbitTemplate")]),e._v(" and a "),n("code",[e._v("CachingConnectionFactory")]),e._v(" with the "),n("code",[e._v("publisherConfirms")]),e._v(" property set to "),n("code",[e._v("true")]),e._v("."),n("br"),e._v("When a publisher confirmation is received and correlation data is supplied, it is written to either the "),n("code",[e._v("confirm-ack-channel")]),e._v(" or the "),n("code",[e._v("confirm-nack-channel")]),e._v(", depending on the confirmation type."),n("br"),e._v("The payload of the confirmation is the correlation data, as defined by this expression."),n("br"),e._v("The message has an 'amqp_publishConfirm' header set to "),n("code",[e._v("true")]),e._v(" ("),n("code",[e._v("ack")]),e._v(") or "),n("code",[e._v("false")]),e._v(" ("),n("code",[e._v("nack")]),e._v(")."),n("br"),e._v("Examples: "),n("code",[e._v("headers['myCorrelationData']")]),e._v(" and "),n("code",[e._v("payload")]),e._v("."),n("br"),e._v("Version 4.1 introduced the "),n("code",[e._v("amqp_publishConfirmNackCause")]),e._v(" message header."),n("br"),e._v("It contains the "),n("code",[e._v("cause")]),e._v(" of a 'nack' for a publisher confirmation."),n("br"),e._v("Starting with version 4.2, if the expression resolves to a "),n("code",[e._v("Message<?>")]),e._v(" instance (such as "),n("code",[e._v("#this")]),e._v("), the message emitted on the "),n("code",[e._v("ack")]),e._v("/"),n("code",[e._v("nack")]),e._v(" channel is based on that message, with the additional header(s) added."),n("br"),e._v("Previously, a new message was created with the correlation data as its payload, regardless of type."),n("br"),e._v("Also see "),n("a",{attrs:{href:"#alternative-confirms-returns"}},[e._v("Alternative Mechanism for Publisher Confirms and Returns")]),e._v("."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("11")])]),e._v(" "),n("td",[e._v("The channel to which positive ("),n("code",[e._v("ack")]),e._v(") publisher confirms are sent."),n("br"),e._v("The payload is the correlation data defined by the "),n("code",[e._v("confirm-correlation-expression")]),e._v("."),n("br"),e._v("If the expression is "),n("code",[e._v("#root")]),e._v(" or "),n("code",[e._v("#this")]),e._v(", the message is built from the original message, with the "),n("code",[e._v("amqp_publishConfirm")]),e._v(" header set to "),n("code",[e._v("true")]),e._v("."),n("br"),e._v("Also see "),n("a",{attrs:{href:"#alternative-confirms-returns"}},[e._v("Alternative Mechanism for Publisher Confirms and Returns")]),e._v("."),n("br"),e._v("Optional (the default is "),n("code",[e._v("nullChannel")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("12")])]),e._v(" "),n("td",[e._v("The channel to which negative ("),n("code",[e._v("nack")]),e._v(") publisher confirmations are sent."),n("br"),e._v("The payload is the correlation data defined by the "),n("code",[e._v("confirm-correlation-expression")]),e._v(" (if there is no "),n("code",[e._v("ErrorMessageStrategy")]),e._v(" configured)."),n("br"),e._v("If the expression is "),n("code",[e._v("#root")]),e._v(" or "),n("code",[e._v("#this")]),e._v(", the message is built from the original message, with the "),n("code",[e._v("amqp_publishConfirm")]),e._v(" header set to "),n("code",[e._v("false")]),e._v("."),n("br"),e._v("When there is an "),n("code",[e._v("ErrorMessageStrategy")]),e._v(", the message is an "),n("code",[e._v("ErrorMessage")]),e._v(" with a "),n("code",[e._v("NackedAmqpMessageException")]),e._v(" payload."),n("br"),e._v("Also see "),n("a",{attrs:{href:"#alternative-confirms-returns"}},[e._v("Alternative Mechanism for Publisher Confirms and Returns")]),e._v("."),n("br"),e._v("Optional (the default is "),n("code",[e._v("nullChannel")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("13")])]),e._v(" "),n("td",[e._v("When set, the adapter will synthesize a negative acknowledgment (nack) if a publisher confirm is not received within this time in milliseconds."),n("br"),e._v("Pending confirms are checked every 50% of this value, so the actual time a nack is sent will be between 1x and 1.5x this value."),n("br"),e._v("Also see "),n("a",{attrs:{href:"#alternative-confirms-returns"}},[e._v("Alternative Mechanism for Publisher Confirms and Returns")]),e._v("."),n("br"),e._v("Default none (nacks will not be generated).")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("14")])]),e._v(" "),n("td",[e._v("When set to true, the calling thread will block, waiting for a publisher confirmation."),n("br"),e._v("This requires a "),n("code",[e._v("RabbitTemplate")]),e._v(" configured for confirms as well as a "),n("code",[e._v("confirm-correlation-expression")]),e._v("."),n("br"),e._v("The thread will block for up to "),n("code",[e._v("confirm-timeout")]),e._v(" (or 5 seconds by default)."),n("br"),e._v("If a timeout occurs, a "),n("code",[e._v("MessageTimeoutException")]),e._v(" will be thrown."),n("br"),e._v("If returns are enabled and a message is returned, or any other exception occurs while awaiting the confirm, a "),n("code",[e._v("MessageHandlingException")]),e._v(" will be thrown, with an appropriate message.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("15")])]),e._v(" "),n("td",[e._v("The channel to which returned messages are sent."),n("br"),e._v("When provided, the underlying AMQP template is configured to return undeliverable messages to the adapter."),n("br"),e._v("When there is no "),n("code",[e._v("ErrorMessageStrategy")]),e._v(" configured, the message is constructed from the data received from AMQP, with the following additional headers: "),n("code",[e._v("amqp_returnReplyCode")]),e._v(", "),n("code",[e._v("amqp_returnReplyText")]),e._v(", "),n("code",[e._v("amqp_returnExchange")]),e._v(", "),n("code",[e._v("amqp_returnRoutingKey")]),e._v("."),n("br"),e._v("When there is an "),n("code",[e._v("ErrorMessageStrategy")]),e._v(", the message is an "),n("code",[e._v("ErrorMessage")]),e._v(" with a "),n("code",[e._v("ReturnedAmqpMessageException")]),e._v(" payload."),n("br"),e._v("Also see "),n("a",{attrs:{href:"#alternative-confirms-returns"}},[e._v("Alternative Mechanism for Publisher Confirms and Returns")]),e._v("."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("16")])]),e._v(" "),n("td",[e._v("A reference to an "),n("code",[e._v("ErrorMessageStrategy")]),e._v(" implementation used to build "),n("code",[e._v("ErrorMessage")]),e._v(" instances when sending returned or negatively acknowledged messages.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("17")])]),e._v(" "),n("td",[e._v("A reference to an "),n("code",[e._v("AmqpHeaderMapper")]),e._v(" to use when sending AMQP Messages."),n("br"),e._v("By default, only standard AMQP properties (such as "),n("code",[e._v("contentType")]),e._v(") are copied to the Spring Integration "),n("code",[e._v("MessageHeaders")]),e._v("."),n("br"),e._v("Any user-defined headers is not copied to the message by the default"),n("code",[e._v("DefaultAmqpHeaderMapper")]),e._v("."),n("br"),e._v("Not allowed if 'request-header-names' is provided."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("18")])]),e._v(" "),n("td",[e._v("Comma-separated list of names of AMQP Headers to be mapped from the "),n("code",[e._v("MessageHeaders")]),e._v(" to the AMQP Message."),n("br"),e._v("Not allowed if the 'header-mapper' reference is provided."),n("br"),e._v("The values in this list can also be simple patterns to be matched against the header names (e.g. "),n("code",[e._v('"*"')]),e._v(" or "),n("code",[e._v('"thing1*, thing2"')]),e._v(" or "),n("code",[e._v('"*thing1"')]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("19")])]),e._v(" "),n("td",[e._v("When set to "),n("code",[e._v("false")]),e._v(", the endpoint attempts to connect to the broker during application context initialization."),n("br"),e._v("This allows “fail fast” detection of bad configuration but also causes initialization to fail if the broker is down."),n("br"),e._v("When "),n("code",[e._v("true")]),e._v(" (the default), the connection is established (if it does not already exist because some other component established it) when the first message is sent.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("20")])]),e._v(" "),n("td",[e._v("When set to "),n("code",[e._v("true")]),e._v(", payloads of type "),n("code",[e._v("Iterable<Message<?>>")]),e._v(" will be sent as discrete messages on the same channel within the scope of a single "),n("code",[e._v("RabbitTemplate")]),e._v(" invocation."),n("br"),e._v("Requires a "),n("code",[e._v("RabbitTemplate")]),e._v("."),n("br"),e._v("When "),n("code",[e._v("wait-for-confirms")]),e._v(" is true, "),n("code",[e._v("RabbitTemplate.waitForConfirmsOrDie()")]),e._v(" is invoked after the messages have been sent."),n("br"),e._v("With a transactional template, the sends will be performed in either a new transaction or one that has already been started (if present).")])])])]),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("return-channel"),n("br"),n("br"),e._v("Using a "),n("code",[e._v("return-channel")]),e._v(" requires a "),n("code",[e._v("RabbitTemplate")]),e._v(" with the "),n("code",[e._v("mandatory")]),e._v(" property set to "),n("code",[e._v("true")]),e._v(" and a "),n("code",[e._v("CachingConnectionFactory")]),e._v(" with the "),n("code",[e._v("publisherReturns")]),e._v(" property set to "),n("code",[e._v("true")]),e._v("."),n("br"),e._v("When using multiple outbound endpoints with returns, a separate "),n("code",[e._v("RabbitTemplate")]),e._v(" is needed for each endpoint.")])])]),e._v(" "),n("tbody")]),e._v(" "),n("h3",{attrs:{id:"outbound-gateway"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#outbound-gateway"}},[e._v("#")]),e._v(" Outbound Gateway")]),e._v(" "),n("p",[e._v("The following listing shows the possible properties for an AMQP Outbound Gateway:")]),e._v(" "),n("p",[e._v("Java DSL")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\npublic IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) {\n    return f -> f.handle(Amqp.outboundGateway(amqpTemplate)\n                    .routingKey("foo")) // default exchange - route to queue \'foo\'\n            .get();\n}\n\n@MessagingGateway(defaultRequestChannel = "amqpOutbound.input")\npublic interface MyGateway {\n\n    String sendToRabbit(String data);\n\n}\n')])])]),n("p",[e._v("Java")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\n@ServiceActivator(inputChannel = "amqpOutboundChannel")\npublic AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {\n    AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);\n    outbound.setExpectReply(true);\n    outbound.setRoutingKey("foo"); // default exchange - route to queue \'foo\'\n    return outbound;\n}\n\n@Bean\npublic MessageChannel amqpOutboundChannel() {\n    return new DirectChannel();\n}\n\n@MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")\npublic interface MyGateway {\n\n    String sendToRabbit(String data);\n\n}\n')])])]),n("p",[e._v("XML")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('<int-amqp:outbound-gateway id="outboundGateway"               (1)\n                           request-channel="myRequestChannel" (2)\n                           amqp-template=""                   (3)\n                           exchange-name=""                   (4)\n                           exchange-name-expression=""        (5)\n                           order="1"                          (6)\n                           reply-channel=""                   (7)\n                           reply-timeout=""                   (8)\n                           requires-reply=""                  (9)\n                           routing-key=""                     (10)\n                           routing-key-expression=""          (11)\n                           default-delivery-mode""            (12)\n                           confirm-correlation-expression=""  (13)\n                           confirm-ack-channel=""             (14)\n                           confirm-nack-channel=""            (15)\n                           confirm-timeout=""                 (16)\n                           return-channel=""                  (17)\n                           error-message-strategy=""          (18)\n                           lazy-connect="true" />             (19)\n')])])]),n("table",[n("thead",[n("tr",[n("th",[n("strong",[e._v("1")])]),e._v(" "),n("th",[e._v("The unique ID for this adapter."),n("br"),e._v("Optional.")])])]),e._v(" "),n("tbody",[n("tr",[n("td",[n("strong",[e._v("2")])]),e._v(" "),n("td",[e._v("Message channel to which messages are sent to have them converted and published to an AMQP exchange."),n("br"),e._v("Required.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("3")])]),e._v(" "),n("td",[e._v("Bean reference to the configured AMQP template."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("amqpTemplate")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("4")])]),e._v(" "),n("td",[e._v("The name of the AMQP exchange to which messages should be sent."),n("br"),e._v("If not provided, messages are sent to the default, no-name cxchange."),n("br"),e._v("Mutually exclusive with 'exchange-name-expression'."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("5")])]),e._v(" "),n("td",[e._v("A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages should be sent, with the message as the root object."),n("br"),e._v("If not provided, messages are sent to the default, no-name exchange."),n("br"),e._v("Mutually exclusive with 'exchange-name'."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("6")])]),e._v(" "),n("td",[e._v("The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("7")])]),e._v(" "),n("td",[e._v("Message channel to which replies should be sent after being received from an AMQP queue and converted."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("8")])]),e._v(" "),n("td",[e._v("The time the gateway waits when sending the reply message to the "),n("code",[e._v("reply-channel")]),e._v("."),n("br"),e._v("This only applies if the "),n("code",[e._v("reply-channel")]),e._v(" can block — such as a "),n("code",[e._v("QueueChannel")]),e._v(" with a capacity limit that is currently full."),n("br"),e._v("Defaults to infinity.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("9")])]),e._v(" "),n("td",[e._v("When "),n("code",[e._v("true")]),e._v(", the gateway throws an exception if no reply message is received within the "),n("code",[e._v("AmqpTemplate’s")]),e._v("replyTimeout"),n("code",[e._v("property.<br/>Defaults to")]),e._v("true`.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("10")])]),e._v(" "),n("td",[e._v("The "),n("code",[e._v("routing-key")]),e._v(" to use when sending messages."),n("br"),e._v("By default, this is an empty "),n("code",[e._v("String")]),e._v("."),n("br"),e._v("Mutually exclusive with 'routing-key-expression'."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("11")])]),e._v(" "),n("td",[e._v("A SpEL expression that is evaluated to determine the "),n("code",[e._v("routing-key")]),e._v(" to use when sending messages, with the message as the root object (for example, 'payload.key')."),n("br"),e._v("By default, this is an empty "),n("code",[e._v("String")]),e._v("."),n("br"),e._v("Mutually exclusive with 'routing-key'."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("12")])]),e._v(" "),n("td",[e._v("The default delivery mode for messages: "),n("code",[e._v("PERSISTENT")]),e._v(" or "),n("code",[e._v("NON_PERSISTENT")]),e._v("."),n("br"),e._v("Overridden if the "),n("code",[e._v("header-mapper")]),e._v(" sets the delivery mode."),n("br"),e._v("If the Spring Integration message header "),n("code",[e._v("amqp_deliveryMode")]),e._v(" is present, the "),n("code",[e._v("DefaultHeaderMapper")]),e._v(" sets the value."),n("br"),e._v("If this attribute is not supplied and the header mapper does not set it, the default depends on the underlying Spring AMQP "),n("code",[e._v("MessagePropertiesConverter")]),e._v(" used by the "),n("code",[e._v("RabbitTemplate")]),e._v("."),n("br"),e._v("If that is not customized at all, the default is "),n("code",[e._v("PERSISTENT")]),e._v("."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("13")])]),e._v(" "),n("td",[e._v("Since version 4.2."),n("br"),e._v("An expression defining correlation data."),n("br"),e._v("When provided, this configures the underlying AMQP template to receive publisher confirms."),n("br"),e._v("Requires a dedicated "),n("code",[e._v("RabbitTemplate")]),e._v(" and a "),n("code",[e._v("CachingConnectionFactory")]),e._v(" with the "),n("code",[e._v("publisherConfirms")]),e._v(" property set to "),n("code",[e._v("true")]),e._v("."),n("br"),e._v("When a publisher confirm is received and correlation data is supplied, it is written to either the "),n("code",[e._v("confirm-ack-channel")]),e._v(" or the "),n("code",[e._v("confirm-nack-channel")]),e._v(", depending on the confirmation type."),n("br"),e._v("The payload of the confirm is the correlation data, as defined by this expression."),n("br"),e._v("The message has a header 'amqp_publishConfirm' set to "),n("code",[e._v("true")]),e._v(" ("),n("code",[e._v("ack")]),e._v(") or "),n("code",[e._v("false")]),e._v(" ("),n("code",[e._v("nack")]),e._v(")."),n("br"),e._v("For "),n("code",[e._v("nack")]),e._v(" confirmations, Spring Integration provides an additional header "),n("code",[e._v("amqp_publishConfirmNackCause")]),e._v("."),n("br"),e._v("Examples: "),n("code",[e._v("headers['myCorrelationData']")]),e._v(" and "),n("code",[e._v("payload")]),e._v("."),n("br"),e._v("If the expression resolves to a "),n("code",[e._v("Message<?>")]),e._v(" instance (such as "),n("code",[e._v("#this")]),e._v("), the message"),n("br"),e._v("emitted on the "),n("code",[e._v("ack")]),e._v("/"),n("code",[e._v("nack")]),e._v(" channel is based on that message, with the additional headers added."),n("br"),e._v("Previously, a new message was created with the correlation data as its payload, regardless of type."),n("br"),e._v("Also see "),n("a",{attrs:{href:"#alternative-confirms-returns"}},[e._v("Alternative Mechanism for Publisher Confirms and Returns")]),e._v("."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("14")])]),e._v(" "),n("td",[e._v("The channel to which positive ("),n("code",[e._v("ack")]),e._v(") publisher confirmations are sent."),n("br"),e._v("The payload is the correlation data defined by "),n("code",[e._v("confirm-correlation-expression")]),e._v("."),n("br"),e._v("If the expression is "),n("code",[e._v("#root")]),e._v(" or "),n("code",[e._v("#this")]),e._v(", the message is built from the original message, with the "),n("code",[e._v("amqp_publishConfirm")]),e._v(" header set to "),n("code",[e._v("true")]),e._v("."),n("br"),e._v("Also see "),n("a",{attrs:{href:"#alternative-confirms-returns"}},[e._v("Alternative Mechanism for Publisher Confirms and Returns")]),e._v("."),n("br"),e._v("Optional (the default is "),n("code",[e._v("nullChannel")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("15")])]),e._v(" "),n("td",[e._v("The channel to which negative ("),n("code",[e._v("nack")]),e._v(") publisher confirmations are sent."),n("br"),e._v("The payload is the correlation data defined by "),n("code",[e._v("confirm-correlation-expression")]),e._v(" (if there is no "),n("code",[e._v("ErrorMessageStrategy")]),e._v(" configured)."),n("br"),e._v("If the expression is "),n("code",[e._v("#root")]),e._v(" or "),n("code",[e._v("#this")]),e._v(", the message is built from the original message, with the "),n("code",[e._v("amqp_publishConfirm")]),e._v(" header set to "),n("code",[e._v("false")]),e._v("."),n("br"),e._v("When there is an "),n("code",[e._v("ErrorMessageStrategy")]),e._v(", the message is an "),n("code",[e._v("ErrorMessage")]),e._v(" with a "),n("code",[e._v("NackedAmqpMessageException")]),e._v(" payload."),n("br"),e._v("Also see "),n("a",{attrs:{href:"#alternative-confirms-returns"}},[e._v("Alternative Mechanism for Publisher Confirms and Returns")]),e._v("."),n("br"),e._v("Optional (the default is "),n("code",[e._v("nullChannel")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("16")])]),e._v(" "),n("td",[e._v("When set, the gateway will synthesize a negative acknowledgment (nack) if a publisher confirm is not received within this time in milliseconds."),n("br"),e._v("Pending confirms are checked every 50% of this value, so the actual time a nack is sent will be between 1x and 1.5x this value."),n("br"),e._v("Default none (nacks will not be generated).")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("17")])]),e._v(" "),n("td",[e._v("The channel to which returned messages are sent."),n("br"),e._v("When provided, the underlying AMQP template is configured to return undeliverable messages to the adapter."),n("br"),e._v("When there is no "),n("code",[e._v("ErrorMessageStrategy")]),e._v(" configured, the message is constructed from the data received from AMQP, with the following additional headers: "),n("code",[e._v("amqp_returnReplyCode")]),e._v(", "),n("code",[e._v("amqp_returnReplyText")]),e._v(", "),n("code",[e._v("amqp_returnExchange")]),e._v(", and "),n("code",[e._v("amqp_returnRoutingKey")]),e._v("."),n("br"),e._v("When there is an "),n("code",[e._v("ErrorMessageStrategy")]),e._v(", the message is an "),n("code",[e._v("ErrorMessage")]),e._v(" with a "),n("code",[e._v("ReturnedAmqpMessageException")]),e._v(" payload."),n("br"),e._v("Also see "),n("a",{attrs:{href:"#alternative-confirms-returns"}},[e._v("Alternative Mechanism for Publisher Confirms and Returns")]),e._v("."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("18")])]),e._v(" "),n("td",[e._v("A reference to an "),n("code",[e._v("ErrorMessageStrategy")]),e._v(" implementation used to build "),n("code",[e._v("ErrorMessage")]),e._v(" instances when sending returned or negatively acknowledged messages.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("19")])]),e._v(" "),n("td",[e._v("When set to "),n("code",[e._v("false")]),e._v(", the endpoint attempts to connect to the broker during application context initialization."),n("br"),e._v("This allows “fail fast” detection of bad configuration by logging an error message if the broker is down."),n("br"),e._v("When "),n("code",[e._v("true")]),e._v(" (the default), the connection is established (if it does not already exist because some other component established it) when the first message is sent.")])])])]),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("return-channel"),n("br"),n("br"),e._v("Using a "),n("code",[e._v("return-channel")]),e._v(" requires a "),n("code",[e._v("RabbitTemplate")]),e._v(" with the "),n("code",[e._v("mandatory")]),e._v(" property set to "),n("code",[e._v("true")]),e._v(" and a "),n("code",[e._v("CachingConnectionFactory")]),e._v(" with the "),n("code",[e._v("publisherReturns")]),e._v(" property set to "),n("code",[e._v("true")]),e._v("."),n("br"),e._v("When using multiple outbound endpoints with returns, a separate "),n("code",[e._v("RabbitTemplate")]),e._v(" is needed for each endpoint.")])])]),e._v(" "),n("tbody")]),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("The underlying "),n("code",[e._v("AmqpTemplate")]),e._v(" has a default "),n("code",[e._v("replyTimeout")]),e._v(" of five seconds."),n("br"),e._v("If you require a longer timeout, you must configure it on the "),n("code",[e._v("template")]),e._v(".")])])]),e._v(" "),n("tbody")]),e._v(" "),n("p",[e._v("Note that the only difference between the outbound adapter and outbound gateway configuration is the setting of the"),n("code",[e._v("expectReply")]),e._v(" property.")]),e._v(" "),n("h3",{attrs:{id:"asynchronous-outbound-gateway"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#asynchronous-outbound-gateway"}},[e._v("#")]),e._v(" Asynchronous Outbound Gateway")]),e._v(" "),n("p",[e._v("The gateway discussed in the previous section is synchronous, in that the sending thread is suspended until a\nreply is received (or a timeout occurs).\nSpring Integration version 4.3 added an asynchronous gateway, which uses the "),n("code",[e._v("AsyncRabbitTemplate")]),e._v(" from Spring AMQP.\nWhen a message is sent, the thread returns immediately after the send operation completes, and, when the message is received, the reply is sent on the template’s listener container thread.\nThis can be useful when the gateway is invoked on a poller thread.\nThe thread is released and is available for other tasks in the framework.")]),e._v(" "),n("p",[e._v("The following listing shows the possible configuration options for an AMQP asynchronous outbound gateway:")]),e._v(" "),n("p",[e._v("Java DSL")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Configuration\npublic class AmqpAsyncApplication {\n\n    @Bean\n    public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {\n        return f -> f\n                .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)\n                        .routingKey("queue1")); // default exchange - route to queue \'queue1\'\n    }\n\n    @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")\n    public interface MyGateway {\n\n        String sendToRabbit(String data);\n\n    }\n\n}\n')])])]),n("p",[e._v("Java")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Configuration\npublic class AmqpAsyncConfig {\n\n    @Bean\n    @ServiceActivator(inputChannel = "amqpOutboundChannel")\n    public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {\n        AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);\n        outbound.setRoutingKey("foo"); // default exchange - route to queue \'foo\'\n        return outbound;\n    }\n\n    @Bean\n    public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,\n                     SimpleMessageListenerContainer replyContainer) {\n\n        return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);\n    }\n\n    @Bean\n    public SimpleMessageListenerContainer replyContainer() {\n        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);\n        container.setQueueNames("asyncRQ1");\n        return container;\n    }\n\n    @Bean\n    public MessageChannel amqpOutboundChannel() {\n        return new DirectChannel();\n    }\n\n}\n')])])]),n("p",[e._v("XML")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('<int-amqp:outbound-async-gateway id="asyncOutboundGateway"    (1)\n                           request-channel="myRequestChannel" (2)\n                           async-template=""                  (3)\n                           exchange-name=""                   (4)\n                           exchange-name-expression=""        (5)\n                           order="1"                          (6)\n                           reply-channel=""                   (7)\n                           reply-timeout=""                   (8)\n                           requires-reply=""                  (9)\n                           routing-key=""                     (10)\n                           routing-key-expression=""          (11)\n                           default-delivery-mode""            (12)\n                           confirm-correlation-expression=""  (13)\n                           confirm-ack-channel=""             (14)\n                           confirm-nack-channel=""            (15)\n                           confirm-timeout=""                 (16)\n                           return-channel=""                  (17)\n                           lazy-connect="true" />             (18)\n')])])]),n("table",[n("thead",[n("tr",[n("th",[n("strong",[e._v("1")])]),e._v(" "),n("th",[e._v("The unique ID for this adapter."),n("br"),e._v("Optional.")])])]),e._v(" "),n("tbody",[n("tr",[n("td",[n("strong",[e._v("2")])]),e._v(" "),n("td",[e._v("Message channel to which messages should be sent in order to have them converted and published to an AMQP exchange."),n("br"),e._v("Required.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("3")])]),e._v(" "),n("td",[e._v("Bean reference to the configured "),n("code",[e._v("AsyncRabbitTemplate")]),e._v("."),n("br"),e._v("Optional (it defaults to "),n("code",[e._v("asyncRabbitTemplate")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("4")])]),e._v(" "),n("td",[e._v("The name of the AMQP exchange to which messages should be sent."),n("br"),e._v("If not provided, messages are sent to the default, no-name exchange."),n("br"),e._v("Mutually exclusive with 'exchange-name-expression'."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("5")])]),e._v(" "),n("td",[e._v("A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages are sent, with the message as the root object."),n("br"),e._v("If not provided, messages are sent to the default, no-name exchange."),n("br"),e._v("Mutually exclusive with 'exchange-name'."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("6")])]),e._v(" "),n("td",[e._v("The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover."),n("br"),e._v("Optional (it defaults to "),n("code",[e._v("Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("7")])]),e._v(" "),n("td",[e._v("Message channel to which replies should be sent after being received from an AMQP queue and converted."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("8")])]),e._v(" "),n("td",[e._v("The time the gateway waits when sending the reply message to the "),n("code",[e._v("reply-channel")]),e._v("."),n("br"),e._v("This only applies if the "),n("code",[e._v("reply-channel")]),e._v(" can block — such as a "),n("code",[e._v("QueueChannel")]),e._v(" with a capacity limit that is currently full."),n("br"),e._v("The default is infinity.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("9")])]),e._v(" "),n("td",[e._v("When no reply message is received within the "),n("code",[e._v("AsyncRabbitTemplate’s")]),e._v("receiveTimeout"),n("code",[e._v("property and this setting is")]),e._v("true"),n("code",[e._v(", the gateway sends an error message to the inbound message’s")]),e._v("errorChannel"),n("code",[e._v("header.<br/>When no reply message is received within the")]),e._v("AsyncRabbitTemplate’s "),n("code",[e._v("receiveTimeout")]),e._v(" property and this setting is "),n("code",[e._v("false")]),e._v(", the gateway sends an error message to the default "),n("code",[e._v("errorChannel")]),e._v(" (if available)."),n("br"),e._v("It defaults to "),n("code",[e._v("true")]),e._v(".")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("10")])]),e._v(" "),n("td",[e._v("The routing-key to use when sending Messages."),n("br"),e._v("By default, this is an empty "),n("code",[e._v("String")]),e._v("."),n("br"),e._v("Mutually exclusive with 'routing-key-expression'."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("11")])]),e._v(" "),n("td",[e._v("A SpEL expression that is evaluated to determine the routing-key to use when sending messages,"),n("br"),e._v("with the message as the root object (for example, 'payload.key')."),n("br"),e._v("By default, this is an empty "),n("code",[e._v("String")]),e._v("."),n("br"),e._v("Mutually exclusive with 'routing-key'."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("12")])]),e._v(" "),n("td",[e._v("The default delivery mode for messages: "),n("code",[e._v("PERSISTENT")]),e._v(" or "),n("code",[e._v("NON_PERSISTENT")]),e._v("."),n("br"),e._v("Overridden if the "),n("code",[e._v("header-mapper")]),e._v(" sets the delivery mode."),n("br"),e._v("If the Spring Integration message header ("),n("code",[e._v("amqp_deliveryMode")]),e._v(") is present, the "),n("code",[e._v("DefaultHeaderMapper")]),e._v(" sets the value."),n("br"),e._v("If this attribute is not supplied and the header mapper does not set it, the default depends on the underlying Spring AMQP "),n("code",[e._v("MessagePropertiesConverter")]),e._v(" used by the "),n("code",[e._v("RabbitTemplate")]),e._v("."),n("br"),e._v("If that is not customized, the default is "),n("code",[e._v("PERSISTENT")]),e._v("."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("13")])]),e._v(" "),n("td",[e._v("An expression that defines correlation data."),n("br"),e._v("When provided, this configures the underlying AMQP template to receive publisher confirmations."),n("br"),e._v("Requires a dedicated "),n("code",[e._v("RabbitTemplate")]),e._v(" and a "),n("code",[e._v("CachingConnectionFactory")]),e._v(" with its "),n("code",[e._v("publisherConfirms")]),e._v(" property set to "),n("code",[e._v("true")]),e._v("."),n("br"),e._v("When a publisher confirmation is received and correlation data is supplied, the confirmation is written to either the "),n("code",[e._v("confirm-ack-channel")]),e._v(" or the "),n("code",[e._v("confirm-nack-channel")]),e._v(", depending on the confirmation type."),n("br"),e._v("The payload of the confirmation is the correlation data as defined by this expression, and the message has its 'amqp_publishConfirm' header set to "),n("code",[e._v("true")]),e._v(" ("),n("code",[e._v("ack")]),e._v(") or "),n("code",[e._v("false")]),e._v(" ("),n("code",[e._v("nack")]),e._v(")."),n("br"),e._v("For "),n("code",[e._v("nack")]),e._v(" instances, an additional header ("),n("code",[e._v("amqp_publishConfirmNackCause")]),e._v(") is provided."),n("br"),e._v("Examples: "),n("code",[e._v("headers['myCorrelationData']")]),e._v(", "),n("code",[e._v("payload")]),e._v("."),n("br"),e._v("If the expression resolves to a "),n("code",[e._v("Message<?>")]),e._v(" instance (such as “#this”), the message emitted on the "),n("code",[e._v("ack")]),e._v("/"),n("code",[e._v("nack")]),e._v(" channel is based on that message, with the additional headers added."),n("br"),e._v("Also see "),n("a",{attrs:{href:"#alternative-confirms-returns"}},[e._v("Alternative Mechanism for Publisher Confirms and Returns")]),e._v("."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("14")])]),e._v(" "),n("td",[e._v("The channel to which positive ("),n("code",[e._v("ack")]),e._v(") publisher confirmations are sent."),n("br"),e._v("The payload is the correlation data defined by the "),n("code",[e._v("confirm-correlation-expression")]),e._v("."),n("br"),e._v("Requires the underlying "),n("code",[e._v("AsyncRabbitTemplate")]),e._v(" to have its "),n("code",[e._v("enableConfirms")]),e._v(" property set to "),n("code",[e._v("true")]),e._v("."),n("br"),e._v("Also see "),n("a",{attrs:{href:"#alternative-confirms-returns"}},[e._v("Alternative Mechanism for Publisher Confirms and Returns")]),e._v("."),n("br"),e._v("Optional (the default is "),n("code",[e._v("nullChannel")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("15")])]),e._v(" "),n("td",[e._v("Since version 4.2."),n("br"),e._v("The channel to which negative ("),n("code",[e._v("nack")]),e._v(") publisher confirmations are sent."),n("br"),e._v("The payload is the correlation data defined by the "),n("code",[e._v("confirm-correlation-expression")]),e._v("."),n("br"),e._v("Requires the underlying "),n("code",[e._v("AsyncRabbitTemplate")]),e._v(" to have its "),n("code",[e._v("enableConfirms")]),e._v(" property set to "),n("code",[e._v("true")]),e._v("."),n("br"),e._v("Also see "),n("a",{attrs:{href:"#alternative-confirms-returns"}},[e._v("Alternative Mechanism for Publisher Confirms and Returns")]),e._v("."),n("br"),e._v("Optional (the default is "),n("code",[e._v("nullChannel")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("16")])]),e._v(" "),n("td",[e._v("When set, the gateway will synthesize a negative acknowledgment (nack) if a publisher confirm is not received within this time in milliseconds."),n("br"),e._v("Pending confirms are checked every 50% of this value, so the actual time a nack is sent will be between 1x and 1.5x this value."),n("br"),e._v("Also see "),n("a",{attrs:{href:"#alternative-confirms-returns"}},[e._v("Alternative Mechanism for Publisher Confirms and Returns")]),e._v("."),n("br"),e._v("Default none (nacks will not be generated).")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("17")])]),e._v(" "),n("td",[e._v("The channel to which returned messages are sent."),n("br"),e._v("When provided, the underlying AMQP template is configured to return undeliverable messages to the gateway."),n("br"),e._v("The message is constructed from the data received from AMQP, with the following additional headers: "),n("code",[e._v("amqp_returnReplyCode")]),e._v(", "),n("code",[e._v("amqp_returnReplyText")]),e._v(", "),n("code",[e._v("amqp_returnExchange")]),e._v(", and "),n("code",[e._v("amqp_returnRoutingKey")]),e._v("."),n("br"),e._v("Requires the underlying "),n("code",[e._v("AsyncRabbitTemplate")]),e._v(" to have its "),n("code",[e._v("mandatory")]),e._v(" property set to "),n("code",[e._v("true")]),e._v("."),n("br"),e._v("Also see "),n("a",{attrs:{href:"#alternative-confirms-returns"}},[e._v("Alternative Mechanism for Publisher Confirms and Returns")]),e._v("."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("18")])]),e._v(" "),n("td",[e._v("When set to "),n("code",[e._v("false")]),e._v(", the endpoint tries to connect to the broker during application context initialization."),n("br"),e._v("Doing so allows “fail fast” detection of bad configuration, by logging an error message if the broker is down."),n("br"),e._v("When "),n("code",[e._v("true")]),e._v(" (the default), the connection is established (if it does not already exist because some other component established"),n("br"),e._v("it) when the first message is sent.")])])])]),e._v(" "),n("p",[e._v("See also "),n("RouterLink",{attrs:{to:"/en/spring-integration/service-activator.html#async-service-activator"}},[e._v("Asynchronous Service Activator")]),e._v(" for more information.")],1),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("RabbitTemplate"),n("br"),n("br"),e._v("When you use confirmations and returns, we recommend that the "),n("code",[e._v("RabbitTemplate")]),e._v(" wired into the "),n("code",[e._v("AsyncRabbitTemplate")]),e._v(" be dedicated."),n("br"),e._v("Otherwise, unexpected side-effects may be encountered.")])])]),e._v(" "),n("tbody")]),e._v(" "),n("h3",{attrs:{id:"alternative-mechanism-for-publisher-confirms-and-returns"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#alternative-mechanism-for-publisher-confirms-and-returns"}},[e._v("#")]),e._v(" Alternative Mechanism for Publisher Confirms and Returns")]),e._v(" "),n("p",[e._v("When the connection factory is configured for publisher confirms and returns, the sections above discuss the configuration of message channels to receive the confirms and returns asynchronously.\nStarting with version 5.4, there is an additional mechanism which is generally easier to use.")]),e._v(" "),n("p",[e._v("In this case, do not configure a "),n("code",[e._v("confirm-correlation-expression")]),e._v(" or the confirm and return channels.\nInstead, add a "),n("code",[e._v("CorrelationData")]),e._v(" instance in the "),n("code",[e._v("AmqpHeaders.PUBLISH_CONFIRM_CORRELATION")]),e._v(" header; you can then wait for the result(s) later, by checking the state of the future in the "),n("code",[e._v("CorrelationData")]),e._v(" instances for which you have sent messages.\nThe "),n("code",[e._v("returnedMessage")]),e._v(" field will always be populated (if a message is returned) before the future is completed.")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('CorrelationData corr = new CorrelationData("someId"); // <--- Unique "id" is required for returns\nsomeFlow.getInputChannel().send(MessageBuilder.withPayload("test")\n        .setHeader("rk", "someKeyThatWontRoute")\n        .setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)\n        .build());\n...\ntry {\n    Confirm Confirm = corr.getFuture().get(10, TimeUnit.SECONDS);\n    Message returned = corr.getReturnedMessage();\n    if (returned !- null) {\n        // message could not be routed\n    }\n}\ncatch { ... }\n')])])]),n("p",[e._v("To improve performance, you may wish to send multiple messages and wait for the confirmations later, rather than one-at-a-time.\nThe returned message is the raw message after conversion; you can subclass "),n("code",[e._v("CorrelationData")]),e._v(" with whatever additional data you need.")]),e._v(" "),n("h3",{attrs:{id:"inbound-message-conversion"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#inbound-message-conversion"}},[e._v("#")]),e._v(" Inbound Message Conversion")]),e._v(" "),n("p",[e._v("Inbound messages, arriving at the channel adapter or gateway, are converted to the "),n("code",[e._v("spring-messaging")]),e._v(" "),n("code",[e._v("Message<?>")]),e._v(" payload using a message converter.\nBy default, a "),n("code",[e._v("SimpleMessageConverter")]),e._v(" is used, which handles java serialization and text.\nHeaders are mapped using the "),n("code",[e._v("DefaultHeaderMapper.inboundMapper()")]),e._v(" by default.\nIf a conversion error occurs, and there is no error channel defined, the exception is thrown to the container and handled by the listener container’s error handler.\nThe default error handler treats conversion errors as fatal and the message will be rejected (and routed to a dead-letter exchange, if the queue is so configured).\nIf an error channel is defined, the "),n("code",[e._v("ErrorMessage")]),e._v(" payload is a "),n("code",[e._v("ListenerExecutionFailedException")]),e._v(" with properties "),n("code",[e._v("failedMessage")]),e._v(" (the Spring AMQP message that could not be converted) and the "),n("code",[e._v("cause")]),e._v(".\nIf the container "),n("code",[e._v("AcknowledgeMode")]),e._v(" is "),n("code",[e._v("AUTO")]),e._v(" (the default) and the error flow consumes the error without throwing an exception, the original message will be acknowledged.\nIf the error flow throws an exception, the exception type, in conjunction with the container’s error handler, will determine whether or not the message is requeued.\nIf the container is configured with "),n("code",[e._v("AcknowledgeMode.MANUAL")]),e._v(", the payload is a "),n("code",[e._v("ManualAckListenerExecutionFailedException")]),e._v(" with additional properties "),n("code",[e._v("channel")]),e._v(" and "),n("code",[e._v("deliveryTag")]),e._v(".\nThis enables the error flow to call "),n("code",[e._v("basicAck")]),e._v(" or "),n("code",[e._v("basicNack")]),e._v(" (or "),n("code",[e._v("basicReject")]),e._v(") for the message, to control its disposition.")]),e._v(" "),n("h3",{attrs:{id:"outbound-message-conversion"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#outbound-message-conversion"}},[e._v("#")]),e._v(" Outbound Message Conversion")]),e._v(" "),n("p",[e._v("Spring AMQP 1.4 introduced the "),n("code",[e._v("ContentTypeDelegatingMessageConverter")]),e._v(", where the actual converter is selected based\non the incoming content type message property.\nThis can be used by inbound endpoints.")]),e._v(" "),n("p",[e._v("As of Spring Integration version 4.3, you can use the "),n("code",[e._v("ContentTypeDelegatingMessageConverter")]),e._v(" on outbound endpoints as well, with the "),n("code",[e._v("contentType")]),e._v(" header specifying which converter is used.")]),e._v(" "),n("p",[e._v("The following example configures a "),n("code",[e._v("ContentTypeDelegatingMessageConverter")]),e._v(", with the default converter being the "),n("code",[e._v("SimpleMessageConverter")]),e._v(" (which handles Java serialization and plain text), together with a JSON converter:")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('<amqp:outbound-channel-adapter id="withContentTypeConverter" channel="ctRequestChannel"\n                               exchange-name="someExchange"\n                               routing-key="someKey"\n                               amqp-template="amqpTemplateContentTypeConverter" />\n\n<int:channel id="ctRequestChannel"/>\n\n<rabbit:template id="amqpTemplateContentTypeConverter"\n        connection-factory="connectionFactory" message-converter="ctConverter" />\n\n<bean id="ctConverter"\n        class="o.s.amqp.support.converter.ContentTypeDelegatingMessageConverter">\n    <property name="delegates">\n        <map>\n            <entry key="application/json">\n                <bean class="o.s.amqp.support.converter.Jackson2JsonMessageConverter" />\n            </entry>\n        </map>\n    </property>\n</bean>\n')])])]),n("p",[e._v("Sending a message to "),n("code",[e._v("ctRequestChannel")]),e._v(" with the "),n("code",[e._v("contentType")]),e._v(" header set to "),n("code",[e._v("application/json")]),e._v(" causes the JSON converter to be selected.")]),e._v(" "),n("p",[e._v("This applies to both the outbound channel adapter and gateway.")]),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("Starting with version 5.0, headers that are added to the "),n("code",[e._v("MessageProperties")]),e._v(" of the outbound message are never overwritten by mapped headers (by default)."),n("br"),e._v("Previously, this was only the case if the message converter was a "),n("code",[e._v("ContentTypeDelegatingMessageConverter")]),e._v(" (in that case, the header was mapped first so that the proper converter could be selected)."),n("br"),e._v("For other converters, such as the "),n("code",[e._v("SimpleMessageConverter")]),e._v(", mapped headers overwrote any headers added by the converter."),n("br"),e._v("This caused problems when an outbound message had some leftover "),n("code",[e._v("contentType")]),e._v(" headers (perhaps from an inbound channel adapter) and the correct outbound "),n("code",[e._v("contentType")]),e._v(" was incorrectly overwritten."),n("br"),e._v("The work-around was to use a header filter to remove the header before sending the message to the outbound endpoint."),n("br"),n("br"),e._v("There are, however, cases where the previous behavior is desired — for example, when a "),n("code",[e._v("String")]),e._v(" payload that contains JSON, the "),n("code",[e._v("SimpleMessageConverter")]),e._v(" is not aware of the content and sets the "),n("code",[e._v("contentType")]),e._v(" message property to "),n("code",[e._v("text/plain")]),e._v(" but your application would like to override that to "),n("code",[e._v("application/json")]),e._v(" by setting the "),n("code",[e._v("contentType")]),e._v(" header of the message sent to the outbound endpoint."),n("br"),e._v("The "),n("code",[e._v("ObjectToJsonTransformer")]),e._v(" does exactly that (by default)."),n("br"),n("br"),e._v("There is now a property called "),n("code",[e._v("headersMappedLast")]),e._v(" on the outbound channel adapter and gateway (as well as on AMQP-backed channels)."),n("br"),e._v("Setting this to "),n("code",[e._v("true")]),e._v(" restores the behavior of overwriting the property added by the converter."),n("br"),n("br"),e._v("Starting with version 5.1.9, a similar "),n("code",[e._v("replyHeadersMappedLast")]),e._v(" is provided for the "),n("code",[e._v("AmqpInboundGateway")]),e._v(" when we produce a reply and would like to override headers populated by the converter."),n("br"),e._v("See its JavaDocs for more information.")])])]),e._v(" "),n("tbody")]),e._v(" "),n("h3",{attrs:{id:"outbound-user-id"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#outbound-user-id"}},[e._v("#")]),e._v(" Outbound User ID")]),e._v(" "),n("p",[e._v("Spring AMQP version 1.6 introduced a mechanism to allow the specification of a default user ID for outbound messages.\nIt has always been possible to set the "),n("code",[e._v("AmqpHeaders.USER_ID")]),e._v(" header, which now takes precedence over the default.\nThis might be useful to message recipients.\nFor inbound messages, if the message publisher sets the property, it is made available in the "),n("code",[e._v("AmqpHeaders.RECEIVED_USER_ID")]),e._v(" header.\nNote that RabbitMQ "),n("a",{attrs:{href:"https://www.rabbitmq.com/validated-user-id.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("validates that the user ID is the actual user ID for the connection or that the connection allows impersonation"),n("OutboundLink")],1),e._v(".")]),e._v(" "),n("p",[e._v("To configure a default user ID for outbound messages, configure it on a "),n("code",[e._v("RabbitTemplate")]),e._v(" and configure the outbound adapter or gateway to use that template.\nSimilarly, to set the user ID property on replies, inject an appropriately configured template into the inbound gateway.\nSee the "),n("a",{attrs:{href:"https://docs.spring.io/spring-amqp/reference/html/_reference.html#template-user-id",target:"_blank",rel:"noopener noreferrer"}},[e._v("Spring AMQP documentation"),n("OutboundLink")],1),e._v(" for more information.")]),e._v(" "),n("h3",{attrs:{id:"delayed-message-exchange"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#delayed-message-exchange"}},[e._v("#")]),e._v(" Delayed Message Exchange")]),e._v(" "),n("p",[e._v("Spring AMQP supports the "),n("a",{attrs:{href:"https://docs.spring.io/spring-amqp/reference/html/#delayed-message-exchange",target:"_blank",rel:"noopener noreferrer"}},[e._v("RabbitMQ Delayed Message Exchange Plugin"),n("OutboundLink")],1),e._v(".\nFor inbound messages, the "),n("code",[e._v("x-delay")]),e._v(" header is mapped to the "),n("code",[e._v("AmqpHeaders.RECEIVED_DELAY")]),e._v(" header.\nSetting the "),n("code",[e._v("AMQPHeaders.DELAY")]),e._v(" header causes the corresponding "),n("code",[e._v("x-delay")]),e._v(" header to be set in outbound messages.\nYou can also specify the "),n("code",[e._v("delay")]),e._v(" and "),n("code",[e._v("delayExpression")]),e._v(" properties on outbound endpoints ("),n("code",[e._v("delay-expression")]),e._v(" when using XML configuration).\nThese properties take precedence over the "),n("code",[e._v("AmqpHeaders.DELAY")]),e._v(" header.")]),e._v(" "),n("h3",{attrs:{id:"amqp-backed-message-channels"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#amqp-backed-message-channels"}},[e._v("#")]),e._v(" AMQP-backed Message Channels")]),e._v(" "),n("p",[e._v("There are two message channel implementations available.\nOne is point-to-point, and the other is publish-subscribe.\nBoth of these channels provide a wide range of configuration attributes for the underlying "),n("code",[e._v("AmqpTemplate")]),e._v(" and"),n("code",[e._v("SimpleMessageListenerContainer")]),e._v(" (as shown earlier in this chapter for the channel adapters and gateways).\nHowever, the examples we show here have minimal configuration.\nExplore the XML schema to view the available attributes.")]),e._v(" "),n("p",[e._v("A point-to-point channel might look like the following example:")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('<int-amqp:channel id="p2pChannel"/>\n')])])]),n("p",[e._v("Under the covers, the preceding example causes a "),n("code",[e._v("Queue")]),e._v(" named "),n("code",[e._v("si.p2pChannel")]),e._v(" to be declared, and this channel sends to that "),n("code",[e._v("Queue")]),e._v(" (technically, by sending to the no-name direct exchange with a routing key that matches the name of this "),n("code",[e._v("Queue")]),e._v(").\nThis channel also registers a consumer on that "),n("code",[e._v("Queue")]),e._v(".\nIf you want the channel to be “pollable” instead of message-driven, provide the "),n("code",[e._v("message-driven")]),e._v(" flag with a value of "),n("code",[e._v("false")]),e._v(", as the following example shows:")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>\n')])])]),n("p",[e._v("A publish-subscribe channel might look like the following:")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('<int-amqp:publish-subscribe-channel id="pubSubChannel"/>\n')])])]),n("p",[e._v("Under the covers, the preceding example causes a fanout exchange named "),n("code",[e._v("si.fanout.pubSubChannel")]),e._v(" to be declared, and this channel sends to that fanout exchange.\nThis channel also declares a server-named exclusive, auto-delete, non-durable "),n("code",[e._v("Queue")]),e._v(" and binds that to the fanout exchange while registering a consumer on that "),n("code",[e._v("Queue")]),e._v(" to receive messages.\nThere is no “pollable” option for a publish-subscribe-channel.\nIt must be message-driven.")]),e._v(" "),n("p",[e._v("Starting with version 4.1, AMQP-backed message channels (in conjunction with "),n("code",[e._v("channel-transacted")]),e._v(") support"),n("code",[e._v("template-channel-transacted")]),e._v(" to separate "),n("code",[e._v("transactional")]),e._v(" configuration for the "),n("code",[e._v("AbstractMessageListenerContainer")]),e._v(" and\nfor the "),n("code",[e._v("RabbitTemplate")]),e._v(".\nNote that, previously, "),n("code",[e._v("channel-transacted")]),e._v(" was "),n("code",[e._v("true")]),e._v(" by default.\nNow, by default, it is "),n("code",[e._v("false")]),e._v(" for the "),n("code",[e._v("AbstractMessageListenerContainer")]),e._v(".")]),e._v(" "),n("p",[e._v("Prior to version 4.3, AMQP-backed channels only supported messages with "),n("code",[e._v("Serializable")]),e._v(" payloads and headers.\nThe entire message was converted (serialized) and sent to RabbitMQ.\nNow, you can set the "),n("code",[e._v("extract-payload")]),e._v(" attribute (or "),n("code",[e._v("setExtractPayload()")]),e._v(" when using Java configuration) to "),n("code",[e._v("true")]),e._v(".\nWhen this flag is "),n("code",[e._v("true")]),e._v(", the message payload is converted and the headers are mapped, in a manner similar to when you use channel adapters.\nThis arrangement lets AMQP-backed channels be used with non-serializable payloads (perhaps with another message converter, such as the "),n("code",[e._v("Jackson2JsonMessageConverter")]),e._v(").\nSee "),n("a",{attrs:{href:"#amqp-message-headers"}},[e._v("AMQP Message Headers")]),e._v(" for more about the default mapped headers.\nYou can modify the mapping by providing custom mappers that use the "),n("code",[e._v("outbound-header-mapper")]),e._v(" and "),n("code",[e._v("inbound-header-mapper")]),e._v(" attributes.\nYou can now also specify a "),n("code",[e._v("default-delivery-mode")]),e._v(", which is used to set the delivery mode when there is no "),n("code",[e._v("amqp_deliveryMode")]),e._v(" header.\nBy default, Spring AMQP "),n("code",[e._v("MessageProperties")]),e._v(" uses "),n("code",[e._v("PERSISTENT")]),e._v(" delivery mode.")]),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("As with other persistence-backed channels, AMQP-backed channels are intended to provide message persistence to avoid message loss."),n("br"),e._v("They are not intended to distribute work to other peer applications."),n("br"),e._v("For that purpose, use channel adapters instead.")])])]),e._v(" "),n("tbody")]),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("Starting with version 5.0, the pollable channel now blocks the poller thread for the specified "),n("code",[e._v("receiveTimeout")]),e._v(" (the default is 1 second)."),n("br"),e._v("Previously, unlike other "),n("code",[e._v("PollableChannel")]),e._v(" implementations, the thread returned immediately to the scheduler if no message was available, regardless of the receive timeout."),n("br"),e._v("Blocking is a little more expensive than using a "),n("code",[e._v("basicGet()")]),e._v(" to retrieve a message (with no timeout), because a consumer has to be created to receive each message."),n("br"),e._v("To restore the previous behavior, set the poller’s "),n("code",[e._v("receiveTimeout")]),e._v(" to 0.")])])]),e._v(" "),n("tbody")]),e._v(" "),n("h4",{attrs:{id:"configuring-with-java-configuration"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#configuring-with-java-configuration"}},[e._v("#")]),e._v(" Configuring with Java Configuration")]),e._v(" "),n("p",[e._v("The following example shows how to configure the channels with Java configuration:")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\npublic AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {\n    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();\n    factoryBean.setConnectionFactory(connectionFactory);\n    factoryBean.setQueueName("foo");\n    factoryBean.setPubSub(false);\n    return factoryBean;\n}\n\n@Bean\npublic AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {\n    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);\n    factoryBean.setConnectionFactory(connectionFactory);\n    factoryBean.setQueueName("bar");\n    factoryBean.setPubSub(false);\n    return factoryBean;\n}\n\n@Bean\npublic AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {\n    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);\n    factoryBean.setConnectionFactory(connectionFactory);\n    factoryBean.setQueueName("baz");\n    factoryBean.setPubSub(false);\n    return factoryBean;\n}\n')])])]),n("h4",{attrs:{id:"configuring-with-the-java-dsl"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#configuring-with-the-java-dsl"}},[e._v("#")]),e._v(" Configuring with the Java DSL")]),e._v(" "),n("p",[e._v("The following example shows how to configure the channels with the Java DSL:")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\npublic IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {\n    return IntegrationFlows.from(...)\n            ...\n            .channel(Amqp.pollableChannel(connectionFactory)\n                    .queueName("foo"))\n            ...\n            .get();\n}\n\n@Bean\npublic IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {\n    return IntegrationFlows.from(...)\n            ...\n            .channel(Amqp.channel(connectionFactory)\n                    .queueName("bar"))\n            ...\n            .get();\n}\n\n@Bean\npublic IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {\n    return IntegrationFlows.from(...)\n            ...\n            .channel(Amqp.publishSubscribeChannel(connectionFactory)\n                    .queueName("baz"))\n            ...\n            .get();\n}\n')])])]),n("h3",{attrs:{id:"amqp-message-headers"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#amqp-message-headers"}},[e._v("#")]),e._v(" AMQP Message Headers")]),e._v(" "),n("h4",{attrs:{id:"overview-2"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#overview-2"}},[e._v("#")]),e._v(" Overview")]),e._v(" "),n("p",[e._v("The Spring Integration AMQP Adapters automatically map all AMQP properties and headers.\n(This is a change from 4.3 - previously, only standard headers were mapped).\nBy default, these properties are copied to and from Spring Integration "),n("code",[e._v("MessageHeaders")]),e._v(" by using the"),n("a",{attrs:{href:"https://docs.spring.io/spring-integration/api/org/springframework/integration/amqp/support/DefaultAmqpHeaderMapper.html",target:"_blank",rel:"noopener noreferrer"}},[n("code",[e._v("DefaultAmqpHeaderMapper")]),n("OutboundLink")],1),e._v(".")]),e._v(" "),n("p",[e._v("You can pass in your own implementation of AMQP-specific header mappers, as the adapters have properties to support doing so.")]),e._v(" "),n("p",[e._v("Any user-defined headers within the AMQP "),n("a",{attrs:{href:"https://docs.spring.io/spring-amqp/api/org/springframework/amqp/core/MessageProperties.html",target:"_blank",rel:"noopener noreferrer"}},[n("code",[e._v("MessageProperties")]),n("OutboundLink")],1),e._v(" are copied to or from an AMQP message, unless explicitly negated by the "),n("code",[e._v("requestHeaderNames")]),e._v(" or "),n("code",[e._v("replyHeaderNames")]),e._v(" properties of the "),n("code",[e._v("DefaultAmqpHeaderMapper")]),e._v(".\nBy default, for an outbound mapper, no "),n("code",[e._v("x-*")]),e._v(" headers are mapped.\nSee the "),n("a",{attrs:{href:"#header-copy-caution"}},[e._v("caution")]),e._v(" that appears later in this section for why.")]),e._v(" "),n("p",[e._v("To override the default and revert to the pre-4.3 behavior, use "),n("code",[e._v("STANDARD_REQUEST_HEADERS")]),e._v(" and"),n("code",[e._v("STANDARD_REPLY_HEADERS")]),e._v(" in the properties.")]),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("When mapping user-defined headers, the values can also contain simple wildcard patterns (such as "),n("code",[e._v("thing*")]),e._v(" or "),n("code",[e._v("*thing")]),e._v(") to be matched."),n("br"),e._v("The "),n("code",[e._v("*")]),e._v(" matches all headers.")])])]),e._v(" "),n("tbody")]),e._v(" "),n("p",[e._v("Starting with version 4.1, the "),n("code",[e._v("AbstractHeaderMapper")]),e._v(" (a "),n("code",[e._v("DefaultAmqpHeaderMapper")]),e._v(" superclass) lets the "),n("code",[e._v("NON_STANDARD_HEADERS")]),e._v(" token be configured for the "),n("code",[e._v("requestHeaderNames")]),e._v(" and "),n("code",[e._v("replyHeaderNames")]),e._v(" properties (in addition to the existing "),n("code",[e._v("STANDARD_REQUEST_HEADERS")]),e._v(" and "),n("code",[e._v("STANDARD_REPLY_HEADERS")]),e._v(") to map all user-defined headers.")]),e._v(" "),n("p",[e._v("The "),n("code",[e._v("org.springframework.amqp.support.AmqpHeaders")]),e._v(" class identifies the default headers that are used by the "),n("code",[e._v("DefaultAmqpHeaderMapper")]),e._v(":")]),e._v(" "),n("ul",[n("li",[n("p",[n("code",[e._v("amqp_appId")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_clusterId")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_contentEncoding")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_contentLength")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("content-type")]),e._v(" (see "),n("a",{attrs:{href:"#amqp-content-type"}},[e._v("The "),n("code",[e._v("contentType")]),e._v(" Header")]),e._v(")")])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_correlationId")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_delay")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_deliveryMode")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_deliveryTag")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_expiration")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_messageCount")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_messageId")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_receivedDelay")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_receivedDeliveryMode")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_receivedExchange")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_receivedRoutingKey")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_redelivered")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_replyTo")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_timestamp")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_type")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_userId")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_publishConfirm")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_publishConfirmNackCause")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_returnReplyCode")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_returnReplyText")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_returnExchange")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_returnRoutingKey")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_channel")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_consumerTag")])])]),e._v(" "),n("li",[n("p",[n("code",[e._v("amqp_consumerQueue")])])])]),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("As mentioned earlier in this section, using a header mapping pattern of "),n("code",[e._v("*")]),e._v(" is a common way to copy all headers."),n("br"),e._v("However, this can have some unexpected side effects, because certain RabbitMQ proprietary properties/headers are also copied."),n("br"),e._v("For example, when you use "),n("a",{attrs:{href:"https://www.rabbitmq.com/federated-exchanges.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("federation"),n("OutboundLink")],1),e._v(", the received message may have a property named "),n("code",[e._v("x-received-from")]),e._v(", which contains the node that sent the message."),n("br"),e._v("If you use the wildcard character "),n("code",[e._v("*")]),e._v(" for the request and reply header mapping on the inbound gateway, this header is copied, which may cause some issues with federation."),n("br"),e._v("This reply message may be federated back to the sending broker, which may think that a message is looping and, as a result, silently drop it."),n("br"),e._v("If you wish to use the convenience of wildcard header mapping, you may need to filter out some headers in the downstream flow."),n("br"),e._v("For example, to avoid copying the "),n("code",[e._v("x-received-from")]),e._v(" header back to the reply you can use "),n("code",[e._v('<int:header-filter …​ header-names="x-received-from">')]),e._v(" before sending the reply to the AMQP inbound gateway."),n("br"),e._v("Alternatively, you can explicitly list those properties that you actually want mapped, instead of using wildcards."),n("br"),e._v("For these reasons, for inbound messages, the mapper (by default) does not map any "),n("code",[e._v("x-*")]),e._v(" headers."),n("br"),e._v("It also does not map the "),n("code",[e._v("deliveryMode")]),e._v(" to the "),n("code",[e._v("amqp_deliveryMode")]),e._v(" header, to avoid propagation of that header from an inbound message to an outbound message."),n("br"),e._v("Instead, this header is mapped to "),n("code",[e._v("amqp_receivedDeliveryMode")]),e._v(", which is not mapped on output.")])])]),e._v(" "),n("tbody")]),e._v(" "),n("p",[e._v("Starting with version 4.3, patterns in the header mappings can be negated by preceding the pattern with "),n("code",[e._v("!")]),e._v(".\nNegated patterns get priority, so a list such as "),n("code",[e._v("STANDARD_REQUEST_HEADERS,thing1,ba*,!thing2,!thing3,qux,!thing1")]),e._v(" does not map "),n("code",[e._v("thing1")]),e._v(" (nor "),n("code",[e._v("thing2")]),e._v(" nor "),n("code",[e._v("thing3")]),e._v(").\nThe standard headers plus "),n("code",[e._v("bad")]),e._v(" and "),n("code",[e._v("qux")]),e._v(" are mapped.\nThe negation technique can be useful for example to not map JSON type headers for incoming messages when a JSON deserialization logic is done in the receiver downstream different way.\nFor this purpose a "),n("code",[e._v("!json_*")]),e._v(" pattern should be configured for header mapper of the inbound channel adapter/gateway.")]),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("If you have a user-defined header that begins with "),n("code",[e._v("!")]),e._v(" that you do wish to map, you need to escape it with "),n("code",[e._v("\\")]),e._v(", as follows: "),n("code",[e._v("STANDARD_REQUEST_HEADERS,\\!myBangHeader")]),e._v("."),n("br"),e._v("The header named "),n("code",[e._v("!myBangHeader")]),e._v(" is now mapped.")])])]),e._v(" "),n("tbody")]),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("Starting with version 5.1, the "),n("code",[e._v("DefaultAmqpHeaderMapper")]),e._v(" will fall back to mapping "),n("code",[e._v("MessageHeaders.ID")]),e._v(" and "),n("code",[e._v("MessageHeaders.TIMESTAMP")]),e._v(" to "),n("code",[e._v("MessageProperties.messageId")]),e._v(" and "),n("code",[e._v("MessageProperties.timestamp")]),e._v(" respectively, if the corresponding "),n("code",[e._v("amqp_messageId")]),e._v(" or "),n("code",[e._v("amqp_timestamp")]),e._v(" headers are not present on outbound messages."),n("br"),e._v("Inbound properties will be mapped to the "),n("code",[e._v("amqp_*")]),e._v(" headers as before."),n("br"),e._v("It is useful to populate the "),n("code",[e._v("messageId")]),e._v(" property when message consumers are using stateful retry.")])])]),e._v(" "),n("tbody")]),e._v(" "),n("h4",{attrs:{id:"the-contenttype-header"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#the-contenttype-header"}},[e._v("#")]),e._v(" The "),n("code",[e._v("contentType")]),e._v(" Header")]),e._v(" "),n("p",[e._v("Unlike other headers, the "),n("code",[e._v("AmqpHeaders.CONTENT_TYPE")]),e._v(" is not prefixed with "),n("code",[e._v("amqp_")]),e._v("; this allows transparent passing of the contentType header across different technologies.\nFor example an inbound HTTP message sent to a RabbitMQ queue.")]),e._v(" "),n("p",[e._v("The "),n("code",[e._v("contentType")]),e._v(" header is mapped to Spring AMQP’s "),n("code",[e._v("MessageProperties.contentType")]),e._v(" property and that is subsequently mapped to RabbitMQ’s "),n("code",[e._v("content_type")]),e._v(" property.")]),e._v(" "),n("p",[e._v("Prior to version 5.1, this header was also mapped as an entry in the "),n("code",[e._v("MessageProperties.headers")]),e._v(" map; this was incorrect and, furthermore, the value could be wrong since the underlying Spring AMQP message converter might have changed the content type.\nSuch a change would be reflected in the first-class "),n("code",[e._v("content_type")]),e._v(" property, but not in the RabbitMQ headers map.\nInbound mapping ignored the headers map value."),n("code",[e._v("contentType")]),e._v(" is no longer mapped to an entry in the headers map.")]),e._v(" "),n("h3",{attrs:{id:"strict-message-ordering"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#strict-message-ordering"}},[e._v("#")]),e._v(" Strict Message Ordering")]),e._v(" "),n("p",[e._v("This section describes message ordering for inbound and outbound messages.")]),e._v(" "),n("h4",{attrs:{id:"inbound"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#inbound"}},[e._v("#")]),e._v(" Inbound")]),e._v(" "),n("p",[e._v("If you require strict ordering of inbound messages, you must configure the inbound listener container’s "),n("code",[e._v("prefetchCount")]),e._v(" property to "),n("code",[e._v("1")]),e._v(".\nThis is because, if a message fails and is redelivered, it arrives after existing prefetched messages.\nSince Spring AMQP version 2.0, the "),n("code",[e._v("prefetchCount")]),e._v(" defaults to "),n("code",[e._v("250")]),e._v(" for improved performance.\nStrict ordering requirements come at the cost of decreased performance.")]),e._v(" "),n("h4",{attrs:{id:"outbound"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#outbound"}},[e._v("#")]),e._v(" Outbound")]),e._v(" "),n("p",[e._v("Consider the following integration flow:")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\npublic IntegrationFlow flow(RabbitTemplate template) {\n    return IntegrationFlows.from(Gateway.class)\n            .split(s -> s.delimiters(","))\n            .<String, String>transform(String::toUpperCase)\n            .handle(Amqp.outboundAdapter(template).routingKey("rk"))\n            .get();\n}\n')])])]),n("p",[e._v("Suppose we send messages "),n("code",[e._v("A")]),e._v(", "),n("code",[e._v("B")]),e._v(" and "),n("code",[e._v("C")]),e._v(" to the gateway.\nWhile it is likely that messages "),n("code",[e._v("A")]),e._v(", "),n("code",[e._v("B")]),e._v(", "),n("code",[e._v("C")]),e._v(" are sent in order, there is no guarantee.\nThis is because the template “borrows” a channel from the cache for each send operation, and there is no guarantee that the same channel is used for each message.\nOne solution is to start a transaction before the splitter, but transactions are expensive in RabbitMQ and can reduce performance several hundred fold.")]),e._v(" "),n("p",[e._v("To solve this problem in a more efficient manner, starting with version 5.1, Spring Integration provides the "),n("code",[e._v("BoundRabbitChannelAdvice")]),e._v(" which is a "),n("code",[e._v("HandleMessageAdvice")]),e._v(".\nSee "),n("RouterLink",{attrs:{to:"/en/spring-integration/handler-advice.html#handle-message-advice"}},[e._v("Handling Message Advice")]),e._v(".\nWhen applied before the splitter, it ensures that all downstream operations are performed on the same channel and, optionally, can wait until publisher confirmations for all sent messages are received (if the connection factory is configured for confirmations).\nThe following example shows how to use "),n("code",[e._v("BoundRabbitChannelAdvice")]),e._v(":")],1),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\npublic IntegrationFlow flow(RabbitTemplate template) {\n    return IntegrationFlows.from(Gateway.class)\n            .split(s -> s.delimiters(",")\n                    .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))\n            .<String, String>transform(String::toUpperCase)\n            .handle(Amqp.outboundAdapter(template).routingKey("rk"))\n            .get();\n}\n')])])]),n("p",[e._v("Notice that the same "),n("code",[e._v("RabbitTemplate")]),e._v(" (which implements "),n("code",[e._v("RabbitOperations")]),e._v(") is used in the advice and the outbound adapter.\nThe advice runs the downstream flow within the template’s "),n("code",[e._v("invoke")]),e._v(" method so that all operations run on the same channel.\nIf the optional timeout is provided, when the flow completes, the advice calls the "),n("code",[e._v("waitForConfirmsOrDie")]),e._v(" method, which throws an exception if the confirmations are not received within the specified time.")]),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("There must be no thread handoffs in the downstream flow ("),n("code",[e._v("QueueChannel")]),e._v(", "),n("code",[e._v("ExecutorChannel")]),e._v(", and others).")])])]),e._v(" "),n("tbody")]),e._v(" "),n("h3",{attrs:{id:"amqp-samples"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#amqp-samples"}},[e._v("#")]),e._v(" AMQP Samples")]),e._v(" "),n("p",[e._v("To experiment with the AMQP adapters, check out the samples available in the Spring Integration samples git repository at "),n("a",{attrs:{href:"https://github.com/spring-projects/spring-integration-samples",target:"_blank",rel:"noopener noreferrer"}},[e._v("https://github.com/SpringSource/spring-integration-samples"),n("OutboundLink")],1)]),e._v(" "),n("p",[e._v("Currently, one sample demonstrates the basic functionality of the Spring Integration AMQP adapter by using an outbound channel adapter and an inbound channel adapter.\nAs AMQP broker implementation in the sample uses "),n("a",{attrs:{href:"https://www.rabbitmq.com/",target:"_blank",rel:"noopener noreferrer"}},[e._v("RabbitMQ"),n("OutboundLink")],1),e._v(".")]),e._v(" "),n("table",[n("thead",[n("tr",[n("th"),e._v(" "),n("th",[e._v("In order to run the example, you need a running instance of RabbitMQ."),n("br"),e._v("A local installation with just the basic defaults suffices."),n("br"),e._v("For detailed RabbitMQ installation procedures, see "),n("a",{attrs:{href:"https://www.rabbitmq.com/install.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("https://www.rabbitmq.com/install.html"),n("OutboundLink")],1)])])]),e._v(" "),n("tbody")]),e._v(" "),n("p",[e._v("Once the sample application is started, enter some text on the command prompt and a message containing that entered text is dispatched to the AMQP queue.\nIn return, that message is retrieved by Spring Integration and printed to the console.")]),e._v(" "),n("p",[e._v("The following image illustrates the basic set of Spring Integration components used in this sample.")]),e._v(" "),n("p",[n("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/spring-integration-amqp-sample-graph.png",alt:"spring integration amqp sample graph"}})]),e._v(" "),n("p",[e._v("Figure 1. The Spring Integration graph of the AMQP sample")])])}),[],!1,null,null,null);t.default=r.exports}}]);