(window.webpackJsonp=window.webpackJsonp||[]).push([[105],{530:function(e,t,n){"use strict";n.r(t);var a=n(56),r=Object(a.a)({},(function(){var e=this,t=e.$createElement,n=e._self._c||t;return n("ContentSlotsDistributor",{attrs:{"slot-key":e.$parent.slotKey}},[n("h1",{attrs:{id:"amqp-support"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#amqp-support"}},[e._v("#")]),e._v(" AMQP Support")]),e._v(" "),n("h2",{attrs:{id:"amqp-support-2"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#amqp-support-2"}},[e._v("#")]),e._v(" AMQP Support")]),e._v(" "),n("p",[e._v("Spring Integration provides channel adapters for receiving and sending messages by using the Advanced Message Queuing Protocol (AMQP).")]),e._v(" "),n("p",[e._v("You need to include this dependency into your project:")]),e._v(" "),n("p",[e._v("Maven")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v("\n org.springframework.integration\n spring-integration-amqp\n 5.5.9\n\n")])])]),n("p",[e._v("Gradle")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('compile "org.springframework.integration:spring-integration-amqp:5.5.9"\n')])])]),n("p",[e._v("The following adapters are available:")]),e._v(" "),n("ul",[n("li",[n("p",[n("a",{attrs:{href:"#amqp-inbound-channel-adapter"}},[e._v("Inbound Channel Adapter")])])]),e._v(" "),n("li",[n("p",[n("a",{attrs:{href:"#amqp-inbound-gateway"}},[e._v("Inbound Gateway")])])]),e._v(" "),n("li",[n("p",[n("a",{attrs:{href:"#amqp-outbound-channel-adapter"}},[e._v("Outbound Channel Adapter")])])]),e._v(" "),n("li",[n("p",[n("a",{attrs:{href:"#amqp-outbound-gateway"}},[e._v("Outbound Gateway")])])]),e._v(" "),n("li",[n("p",[n("a",{attrs:{href:"#amqp-async-outbound-gateway"}},[e._v("Async Outbound Gateway")])])])]),e._v(" "),n("p",[e._v("Spring Integration also provides a point-to-point message channel and a publish-subscribe message channel backed by AMQP Exchanges and Queues.")]),e._v(" "),n("p",[e._v("To provide AMQP support, Spring Integration relies on ("),n("a",{attrs:{href:"https://projects.spring.io/spring-amqp",target:"_blank",rel:"noopener noreferrer"}},[e._v("Spring AMQP"),n("OutboundLink")],1),e._v("), which applies core Spring concepts to the development of AMQP-based messaging solutions.\nSpring AMQP provides similar semantics to ("),n("a",{attrs:{href:"https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#jms",target:"_blank",rel:"noopener noreferrer"}},[e._v("Spring JMS"),n("OutboundLink")],1),e._v(").")]),e._v(" "),n("p",[e._v("Whereas the provided AMQP Channel Adapters are intended for unidirectional messaging (send or receive) only, Spring Integration also provides inbound and outbound AMQP gateways for request-reply operations.")]),e._v(" "),n("p",[e._v("TIP:\nYou should familiarize yourself with the "),n("a",{attrs:{href:"https://docs.spring.io/spring-amqp/reference/html/",target:"_blank",rel:"noopener noreferrer"}},[e._v("reference documentation of the Spring AMQP project"),n("OutboundLink")],1),e._v(".\nIt provides much more in-depth information about Spring’s integration with AMQP in general and RabbitMQ in particular.")]),e._v(" "),n("h3",{attrs:{id:"inbound-channel-adapter"}},[n("a",{staticClass:"header-anchor",attrs:{href:"#inbound-channel-adapter"}},[e._v("#")]),e._v(" Inbound Channel Adapter")]),e._v(" "),n("p",[e._v("The following listing shows the possible configuration options for an AMQP Inbound Channel Adapter:")]),e._v(" "),n("p",[e._v("Java DSL")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\npublic IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {\n return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "aName"))\n .handle(m -> System.out.println(m.getPayload()))\n .get();\n}\n')])])]),n("p",[e._v("Java")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v('@Bean\npublic MessageChannel amqpInputChannel() {\n return new DirectChannel();\n}\n\n@Bean\npublic AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,\n @Qualifier("amqpInputChannel") MessageChannel channel) {\n AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);\n adapter.setOutputChannel(channel);\n return adapter;\n}\n\n@Bean\npublic SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {\n SimpleMessageListenerContainer container =\n new SimpleMessageListenerContainer(connectionFactory);\n container.setQueueNames("aName");\n container.setConcurrentConsumers(2);\n // ...\n return container;\n}\n\n@Bean\n@ServiceActivator(inputChannel = "amqpInputChannel")\npublic MessageHandler handler() {\n return new MessageHandler() {\n\n @Override\n public void handleMessage(Message> message) throws MessagingException {\n System.out.println(message.getPayload());\n }\n\n };\n}\n')])])]),n("p",[e._v("XML")]),e._v(" "),n("div",{staticClass:"language- extra-class"},[n("pre",{pre:!0,attrs:{class:"language-text"}},[n("code",[e._v(' (27)\n')])])]),n("table",[n("thead",[n("tr",[n("th",[n("strong",[e._v("1")])]),e._v(" "),n("th",[e._v("The unique ID for this adapter."),n("br"),e._v("Optional.")])])]),e._v(" "),n("tbody",[n("tr",[n("td",[n("strong",[e._v("2")])]),e._v(" "),n("td",[e._v("Message channel to which converted messages should be sent."),n("br"),e._v("Required.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("3")])]),e._v(" "),n("td",[e._v("Names of the AMQP queues (comma-separated list) from which messages should be consumed."),n("br"),e._v("Required.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("4")])]),e._v(" "),n("td",[e._v("Acknowledge mode for the "),n("code",[e._v("MessageListenerContainer")]),e._v("."),n("br"),e._v("When set to "),n("code",[e._v("MANUAL")]),e._v(", the delivery tag and channel are provided in message headers "),n("code",[e._v("amqp_deliveryTag")]),e._v(" and "),n("code",[e._v("amqp_channel")]),e._v(", respectively."),n("br"),e._v("The user application is responsible for acknowledgement."),n("code",[e._v("NONE")]),e._v(" means no acknowledgements ("),n("code",[e._v("autoAck")]),e._v(")."),n("code",[e._v("AUTO")]),e._v(" means the adapter’s container acknowledges when the downstream flow completes."),n("br"),e._v("Optional (defaults to AUTO)."),n("br"),e._v("See "),n("a",{attrs:{href:"#amqp-inbound-ack"}},[e._v("Inbound Endpoint Acknowledge Mode")]),e._v(".")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("5")])]),e._v(" "),n("td",[e._v("Extra AOP Advices to handle cross-cutting behavior associated with this inbound channel adapter."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("6")])]),e._v(" "),n("td",[e._v("Flag to indicate that channels created by this component are transactional."),n("br"),e._v("If true, it tells the framework to use a transactional channel and to end all operations (send or receive) with a commit or rollback, depending on the outcome, with an exception that signals a rollback."),n("br"),e._v("Optional (Defaults to false).")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("7")])]),e._v(" "),n("td",[e._v("Specify the number of concurrent consumers to create."),n("br"),e._v("The default is "),n("code",[e._v("1")]),e._v("."),n("br"),e._v("We recommend raising the number of concurrent consumers to scale the consumption of messages coming in from a queue."),n("br"),e._v("However, note that any ordering guarantees are lost once multiple consumers are registered."),n("br"),e._v("In general, use one consumer for low-volume queues."),n("br"),e._v("Not allowed when 'consumers-per-queue' is set."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("8")])]),e._v(" "),n("td",[e._v("Bean reference to the RabbitMQ "),n("code",[e._v("ConnectionFactory")]),e._v("."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("connectionFactory")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("9")])]),e._v(" "),n("td",[e._v("Message channel to which error messages should be sent."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("10")])]),e._v(" "),n("td",[e._v("Whether the listener channel (com.rabbitmq.client.Channel) is exposed to a registered "),n("code",[e._v("ChannelAwareMessageListener")]),e._v("."),n("br"),e._v("Optional (defaults to true).")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("11")])]),e._v(" "),n("td",[e._v("A reference to an "),n("code",[e._v("AmqpHeaderMapper")]),e._v(" to use when receiving AMQP Messages."),n("br"),e._v("Optional."),n("br"),e._v("By default, only standard AMQP properties (such as "),n("code",[e._v("contentType")]),e._v(") are copied to Spring Integration "),n("code",[e._v("MessageHeaders")]),e._v("."),n("br"),e._v("Any user-defined headers within the AMQP "),n("code",[e._v("MessageProperties")]),e._v(" are NOT copied to the message by the default "),n("code",[e._v("DefaultAmqpHeaderMapper")]),e._v("."),n("br"),e._v("Not allowed if 'request-header-names' is provided.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("12")])]),e._v(" "),n("td",[e._v("Comma-separated list of the names of AMQP Headers to be mapped from the AMQP request into the "),n("code",[e._v("MessageHeaders")]),e._v("."),n("br"),e._v("This can only be provided if the 'header-mapper' reference is not provided."),n("br"),e._v('The values in this list can also be simple patterns to be matched against the header names (such as "*" or "thing1*, thing2" or "*something").')])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("13")])]),e._v(" "),n("td",[e._v("Reference to the "),n("code",[e._v("AbstractMessageListenerContainer")]),e._v(" to use for receiving AMQP Messages."),n("br"),e._v("If this attribute is provided, no other attribute related to the listener container configuration should be provided."),n("br"),e._v("In other words, by setting this reference, you must take full responsibility for the listener container configuration."),n("br"),e._v("The only exception is the "),n("code",[e._v("MessageListener")]),e._v(" itself."),n("br"),e._v("Since that is actually the core responsibility of this channel adapter implementation, the referenced listener container must not already have its own "),n("code",[e._v("MessageListener")]),e._v("."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("14")])]),e._v(" "),n("td",[e._v("The "),n("code",[e._v("MessageConverter")]),e._v(" to use when receiving AMQP messages."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("15")])]),e._v(" "),n("td",[e._v("The "),n("code",[e._v("MessagePropertiesConverter")]),e._v(" to use when receiving AMQP messages."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("16")])]),e._v(" "),n("td",[e._v("Specifies the phase in which the underlying "),n("code",[e._v("AbstractMessageListenerContainer")]),e._v(" should be started and stopped."),n("br"),e._v("The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that."),n("br"),e._v("By default, this value is "),n("code",[e._v("Integer.MAX_VALUE")]),e._v(", meaning that this container starts as late as possible and stops as soon as possible."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("17")])]),e._v(" "),n("td",[e._v("Tells the AMQP broker how many messages to send to each consumer in a single request."),n("br"),e._v("Often, you can set this value high to improve throughput."),n("br"),e._v("It should be greater than or equal to the transaction size (see the "),n("code",[e._v("tx-size")]),e._v(" attribute, later in this list)."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("1")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("18")])]),e._v(" "),n("td",[e._v("Receive timeout in milliseconds."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("1000")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("19")])]),e._v(" "),n("td",[e._v("Specifies the interval between recovery attempts of the underlying "),n("code",[e._v("AbstractMessageListenerContainer")]),e._v(" (in milliseconds)."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("5000")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("20")])]),e._v(" "),n("td",[e._v("If 'true' and none of the queues are available on the broker, the container throws a fatal exception during startup and stops if the queues are deleted when the container is running (after making three attempts to passively declare the queues)."),n("br"),e._v("If "),n("code",[e._v("false")]),e._v(", the container does not throw an exception and goes into recovery mode, attempting to restart according to the "),n("code",[e._v("recovery-interval")]),e._v("."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("true")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("21")])]),e._v(" "),n("td",[e._v("The time to wait for workers (in milliseconds) after the underlying "),n("code",[e._v("AbstractMessageListenerContainer")]),e._v(" is stopped and before the AMQP connection is forced closed."),n("br"),e._v("If any workers are active when the shutdown signal comes, they are allowed to finish processing as long as they can finish within this timeout."),n("br"),e._v("Otherwise, the connection is closed and messages remain unacknowledged (if the channel is transactional)."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("5000")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("22")])]),e._v(" "),n("td",[e._v("By default, the underlying "),n("code",[e._v("AbstractMessageListenerContainer")]),e._v(" uses a "),n("code",[e._v("SimpleAsyncTaskExecutor")]),e._v(" implementation, that fires up a new thread for each task, running it asynchronously."),n("br"),e._v("By default, the number of concurrent threads is unlimited."),n("br"),e._v("Note that this implementation does not reuse threads."),n("br"),e._v("Consider using a thread-pooling "),n("code",[e._v("TaskExecutor")]),e._v(" implementation as an alternative."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("SimpleAsyncTaskExecutor")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("23")])]),e._v(" "),n("td",[e._v("By default, the underlying "),n("code",[e._v("AbstractMessageListenerContainer")]),e._v(" creates a new instance of the "),n("code",[e._v("DefaultTransactionAttribute")]),e._v(" (it takes the EJB approach to rolling back on runtime but not checked exceptions)."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("DefaultTransactionAttribute")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("24")])]),e._v(" "),n("td",[e._v("Sets a bean reference to an external "),n("code",[e._v("PlatformTransactionManager")]),e._v(" on the underlying "),n("code",[e._v("AbstractMessageListenerContainer")]),e._v("."),n("br"),e._v("The transaction manager works in conjunction with the "),n("code",[e._v("channel-transacted")]),e._v(" attribute."),n("br"),e._v("If there is already a transaction in progress when the framework is sending or receiving a message and the "),n("code",[e._v("channelTransacted")]),e._v(" flag is "),n("code",[e._v("true")]),e._v(", the commit or rollback of the messaging transaction is deferred until the end of the current transaction."),n("br"),e._v("If the "),n("code",[e._v("channelTransacted")]),e._v(" flag is "),n("code",[e._v("false")]),e._v(", no transaction semantics apply to the messaging operation (it is auto-acked)."),n("br"),e._v("For further information, see"),n("a",{attrs:{href:"https://docs.spring.io/spring-amqp/reference/html/%255Freference.html#%5Ftransactions",target:"_blank",rel:"noopener noreferrer"}},[e._v("Transactions with Spring AMQP"),n("OutboundLink")],1),e._v("."),n("br"),e._v("Optional.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("25")])]),e._v(" "),n("td",[e._v("Tells the "),n("code",[e._v("SimpleMessageListenerContainer")]),e._v(" how many messages to process in a single transaction (if the channel is transactional)."),n("br"),e._v("For best results, it should be less than or equal to the value set in "),n("code",[e._v("prefetch-count")]),e._v("."),n("br"),e._v("Not allowed when 'consumers-per-queue' is set."),n("br"),e._v("Optional (defaults to "),n("code",[e._v("1")]),e._v(").")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("26")])]),e._v(" "),n("td",[e._v("Indicates that the underlying listener container should be a "),n("code",[e._v("DirectMessageListenerContainer")]),e._v(" instead of the default "),n("code",[e._v("SimpleMessageListenerContainer")]),e._v("."),n("br"),e._v("See the "),n("a",{attrs:{href:"https://docs.spring.io/spring-amqp/reference/html/",target:"_blank",rel:"noopener noreferrer"}},[e._v("Spring AMQP Reference Manual"),n("OutboundLink")],1),e._v(" for more information.")])]),e._v(" "),n("tr",[n("td",[n("strong",[e._v("27")])]),e._v(" "),n("td",[e._v("When the container’s "),n("code",[e._v("consumerBatchEnabled")]),e._v(" is "),n("code",[e._v("true")]),e._v(", determines how the adapter presents the batch of messages in the message payload."),n("br"),e._v("When set to "),n("code",[e._v("MESSAGES")]),e._v(" (default), the payload is a "),n("code",[e._v("List>")]),e._v(" where each message has headers mapped from the incoming AMQP "),n("code",[e._v("Message")]),e._v(" and the payload is the converted "),n("code",[e._v("body")]),e._v("."),n("br"),e._v("When set to "),n("code",[e._v("EXTRACT_PAYLOADS")]),e._v(", the payload is a "),n("code",[e._v("List>")]),e._v(" where the elements are converted from the AMQP "),n("code",[e._v("Message")]),e._v(" body."),n("code",[e._v("EXTRACT_PAYLOADS_WITH_HEADERS")]),e._v(" is similar to "),n("code",[e._v("EXTRACT_PAYLOADS")]),e._v(" but, in addition, the headers from each message are mapped from the "),n("code",[e._v("MessageProperties")]),e._v(" into a "),n("code",[e._v("List