# AMQP Support
## AMQP Support
Spring Integration provides channel adapters for receiving and sending messages by using the Advanced Message Queuing Protocol (AMQP).
You need to include this dependency into your project:
Maven
```
org.springframework.integration
spring-integration-amqp
5.5.9
```
Gradle
```
compile "org.springframework.integration:spring-integration-amqp:5.5.9"
```
The following adapters are available:
* [Inbound Channel Adapter](#amqp-inbound-channel-adapter)
* [Inbound Gateway](#amqp-inbound-gateway)
* [Outbound Channel Adapter](#amqp-outbound-channel-adapter)
* [Outbound Gateway](#amqp-outbound-gateway)
* [Async Outbound Gateway](#amqp-async-outbound-gateway)
Spring Integration also provides a point-to-point message channel and a publish-subscribe message channel backed by AMQP Exchanges and Queues.
To provide AMQP support, Spring Integration relies on ([Spring AMQP](https://projects.spring.io/spring-amqp)), which applies core Spring concepts to the development of AMQP-based messaging solutions.
Spring AMQP provides similar semantics to ([Spring JMS](https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#jms)).
Whereas the provided AMQP Channel Adapters are intended for unidirectional messaging (send or receive) only, Spring Integration also provides inbound and outbound AMQP gateways for request-reply operations.
TIP:
You should familiarize yourself with the [reference documentation of the Spring AMQP project](https://docs.spring.io/spring-amqp/reference/html/).
It provides much more in-depth information about Spring’s integration with AMQP in general and RabbitMQ in particular.
### Inbound Channel Adapter
The following listing shows the possible configuration options for an AMQP Inbound Channel Adapter:
Java DSL
```
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "aName"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
```
Java
```
@Bean
public MessageChannel amqpInputChannel() {
return new DirectChannel();
}
@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
@Qualifier("amqpInputChannel") MessageChannel channel) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(channel);
return adapter;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("aName");
container.setConcurrentConsumers(2);
// ...
return container;
}
@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
```
XML
```
(27)
```
|**1** | The unique ID for this adapter.
Optional. |
|------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2** | Message channel to which converted messages should be sent.
Required. |
|**3** | Names of the AMQP queues (comma-separated list) from which messages should be consumed.
Required. |
|**4** | Acknowledge mode for the `MessageListenerContainer`.
When set to `MANUAL`, the delivery tag and channel are provided in message headers `amqp_deliveryTag` and `amqp_channel`, respectively.
The user application is responsible for acknowledgement.`NONE` means no acknowledgements (`autoAck`).`AUTO` means the adapter’s container acknowledges when the downstream flow completes.
Optional (defaults to AUTO).
See [Inbound Endpoint Acknowledge Mode](#amqp-inbound-ack). |
|**5** | Extra AOP Advices to handle cross-cutting behavior associated with this inbound channel adapter.
Optional. |
|**6** | Flag to indicate that channels created by this component are transactional.
If true, it tells the framework to use a transactional channel and to end all operations (send or receive) with a commit or rollback, depending on the outcome, with an exception that signals a rollback.
Optional (Defaults to false). |
|**7** | Specify the number of concurrent consumers to create.
The default is `1`.
We recommend raising the number of concurrent consumers to scale the consumption of messages coming in from a queue.
However, note that any ordering guarantees are lost once multiple consumers are registered.
In general, use one consumer for low-volume queues.
Not allowed when 'consumers-per-queue' is set.
Optional. |
|**8** | Bean reference to the RabbitMQ `ConnectionFactory`.
Optional (defaults to `connectionFactory`). |
|**9** | Message channel to which error messages should be sent.
Optional. |
|**10**| Whether the listener channel (com.rabbitmq.client.Channel) is exposed to a registered `ChannelAwareMessageListener`.
Optional (defaults to true). |
|**11**| A reference to an `AmqpHeaderMapper` to use when receiving AMQP Messages.
Optional.
By default, only standard AMQP properties (such as `contentType`) are copied to Spring Integration `MessageHeaders`.
Any user-defined headers within the AMQP `MessageProperties` are NOT copied to the message by the default `DefaultAmqpHeaderMapper`.
Not allowed if 'request-header-names' is provided. |
|**12**| Comma-separated list of the names of AMQP Headers to be mapped from the AMQP request into the `MessageHeaders`.
This can only be provided if the 'header-mapper' reference is not provided.
The values in this list can also be simple patterns to be matched against the header names (such as "\*" or "thing1\*, thing2" or "\*something"). |
|**13**| Reference to the `AbstractMessageListenerContainer` to use for receiving AMQP Messages.
If this attribute is provided, no other attribute related to the listener container configuration should be provided.
In other words, by setting this reference, you must take full responsibility for the listener container configuration.
The only exception is the `MessageListener` itself.
Since that is actually the core responsibility of this channel adapter implementation, the referenced listener container must not already have its own `MessageListener`.
Optional. |
|**14**| The `MessageConverter` to use when receiving AMQP messages.
Optional. |
|**15**| The `MessagePropertiesConverter` to use when receiving AMQP messages.
Optional. |
|**16**| Specifies the phase in which the underlying `AbstractMessageListenerContainer` should be started and stopped.
The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that.
By default, this value is `Integer.MAX_VALUE`, meaning that this container starts as late as possible and stops as soon as possible.
Optional. |
|**17**| Tells the AMQP broker how many messages to send to each consumer in a single request.
Often, you can set this value high to improve throughput.
It should be greater than or equal to the transaction size (see the `tx-size` attribute, later in this list).
Optional (defaults to `1`). |
|**18**| Receive timeout in milliseconds.
Optional (defaults to `1000`). |
|**19**| Specifies the interval between recovery attempts of the underlying `AbstractMessageListenerContainer` (in milliseconds).
Optional (defaults to `5000`). |
|**20**| If 'true' and none of the queues are available on the broker, the container throws a fatal exception during startup and stops if the queues are deleted when the container is running (after making three attempts to passively declare the queues).
If `false`, the container does not throw an exception and goes into recovery mode, attempting to restart according to the `recovery-interval`.
Optional (defaults to `true`). |
|**21**| The time to wait for workers (in milliseconds) after the underlying `AbstractMessageListenerContainer` is stopped and before the AMQP connection is forced closed.
If any workers are active when the shutdown signal comes, they are allowed to finish processing as long as they can finish within this timeout.
Otherwise, the connection is closed and messages remain unacknowledged (if the channel is transactional).
Optional (defaults to `5000`). |
|**22**| By default, the underlying `AbstractMessageListenerContainer` uses a `SimpleAsyncTaskExecutor` implementation, that fires up a new thread for each task, running it asynchronously.
By default, the number of concurrent threads is unlimited.
Note that this implementation does not reuse threads.
Consider using a thread-pooling `TaskExecutor` implementation as an alternative.
Optional (defaults to `SimpleAsyncTaskExecutor`). |
|**23**| By default, the underlying `AbstractMessageListenerContainer` creates a new instance of the `DefaultTransactionAttribute` (it takes the EJB approach to rolling back on runtime but not checked exceptions).
Optional (defaults to `DefaultTransactionAttribute`). |
|**24**|Sets a bean reference to an external `PlatformTransactionManager` on the underlying `AbstractMessageListenerContainer`.
The transaction manager works in conjunction with the `channel-transacted` attribute.
If there is already a transaction in progress when the framework is sending or receiving a message and the `channelTransacted` flag is `true`, the commit or rollback of the messaging transaction is deferred until the end of the current transaction.
If the `channelTransacted` flag is `false`, no transaction semantics apply to the messaging operation (it is auto-acked).
For further information, see[Transactions with Spring AMQP](https://docs.spring.io/spring-amqp/reference/html/%255Freference.html#%5Ftransactions).
Optional.|
|**25**| Tells the `SimpleMessageListenerContainer` how many messages to process in a single transaction (if the channel is transactional).
For best results, it should be less than or equal to the value set in `prefetch-count`.
Not allowed when 'consumers-per-queue' is set.
Optional (defaults to `1`). |
|**26**| Indicates that the underlying listener container should be a `DirectMessageListenerContainer` instead of the default `SimpleMessageListenerContainer`.
See the [Spring AMQP Reference Manual](https://docs.spring.io/spring-amqp/reference/html/) for more information. |
|**27**| When the container’s `consumerBatchEnabled` is `true`, determines how the adapter presents the batch of messages in the message payload.
When set to `MESSAGES` (default), the payload is a `List>` where each message has headers mapped from the incoming AMQP `Message` and the payload is the converted `body`.
When set to `EXTRACT_PAYLOADS`, the payload is a `List>` where the elements are converted from the AMQP `Message` body.`EXTRACT_PAYLOADS_WITH_HEADERS` is similar to `EXTRACT_PAYLOADS` but, in addition, the headers from each message are mapped from the `MessageProperties` into a `List