# Core Messaging ## Messaging Channels ### Message Channels While the `Message` plays the crucial role of encapsulating data, it is the `MessageChannel` that decouples message producers from message consumers. #### The MessageChannel Interface Spring Integration’s top-level `MessageChannel` interface is defined as follows: ``` public interface MessageChannel { boolean send(Message message); boolean send(Message message, long timeout); } ``` When sending a message, the return value is `true` if the message is sent successfully. If the send call times out or is interrupted, it returns `false`. ##### `PollableChannel` Since message channels may or may not buffer messages (as discussed in the [Spring Integration Overview](./overview.html#overview)), two sub-interfaces define the buffering (pollable) and non-buffering (subscribable) channel behavior. The following listing shows the definition of the `PollableChannel` interface: ``` public interface PollableChannel extends MessageChannel { Message receive(); Message receive(long timeout); } ``` As with the send methods, when receiving a message, the return value is null in the case of a timeout or interrupt. ##### `SubscribableChannel` The `SubscribableChannel` base interface is implemented by channels that send messages directly to their subscribed `MessageHandler` instances. Therefore, they do not provide receive methods for polling. Instead, they define methods for managing those subscribers. The following listing shows the definition of the `SubscribableChannel` interface: ``` public interface SubscribableChannel extends MessageChannel { boolean subscribe(MessageHandler handler); boolean unsubscribe(MessageHandler handler); } ``` #### Message Channel Implementations Spring Integration provides several different message channel implementations. The following sections briefly describe each one. ##### `PublishSubscribeChannel` The `PublishSubscribeChannel` implementation broadcasts any `Message` sent to it to all of its subscribed handlers. This is most often used for sending event messages, whose primary role is notification (as opposed to document messages, which are generally intended to be processed by a single handler). Note that the `PublishSubscribeChannel` is intended for sending only. Since it broadcasts to its subscribers directly when its `send(Message)` method is invoked, consumers cannot poll for messages (it does not implement `PollableChannel` and therefore has no `receive()` method). Instead, any subscriber must itself be a `MessageHandler`, and the subscriber’s `handleMessage(Message)` method is invoked in turn. Prior to version 3.0, invoking the `send` method on a `PublishSubscribeChannel` that had no subscribers returned `false`. When used in conjunction with a `MessagingTemplate`, a `MessageDeliveryException` was thrown. Starting with version 3.0, the behavior has changed such that a `send` is always considered successful if at least the minimum subscribers are present (and successfully handle the message). This behavior can be modified by setting the `minSubscribers` property, which defaults to `0`. | |If you use a `TaskExecutor`, only the presence of the correct number of subscribers is used for this determination, because the actual handling of the message is performed asynchronously.| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ##### `QueueChannel` The `QueueChannel` implementation wraps a queue. Unlike the `PublishSubscribeChannel`, the `QueueChannel` has point-to-point semantics. In other words, even if the channel has multiple consumers, only one of them should receive any `Message` sent to that channel. It provides a default no-argument constructor (providing an essentially unbounded capacity of `Integer.MAX_VALUE`) as well as a constructor that accepts the queue capacity, as the following listing shows: ``` public QueueChannel(int capacity) ``` A channel that has not reached its capacity limit stores messages in its internal queue, and the `send(Message)` method returns immediately, even if no receiver is ready to handle the message. If the queue has reached capacity, the sender blocks until room is available in the queue. Alternatively, if you use the send method that has an additional timeout parameter, the queue blocks until either room is available or the timeout period elapses, whichever occurs first. Similarly, a `receive()` call returns immediately if a message is available on the queue, but, if the queue is empty, then a receive call may block until either a message is available or the timeout, if provided, elapses. In either case, it is possible to force an immediate return regardless of the queue’s state by passing a timeout value of 0. Note, however, that calls to the versions of `send()` and `receive()` with no `timeout` parameter block indefinitely. ##### `PriorityChannel` Whereas the `QueueChannel` enforces first-in-first-out (FIFO) ordering, the `PriorityChannel` is an alternative implementation that allows for messages to be ordered within the channel based upon a priority. By default, the priority is determined by the `priority` header within each message. However, for custom priority determination logic, a comparator of type `Comparator>` can be provided to the `PriorityChannel` constructor. ##### `RendezvousChannel` The `RendezvousChannel` enables a “direct-handoff” scenario, wherein a sender blocks until another party invokes the channel’s `receive()` method. The other party blocks until the sender sends the message. Internally, this implementation is quite similar to the `QueueChannel`, except that it uses a `SynchronousQueue` (a zero-capacity implementation of `BlockingQueue`). This works well in situations where the sender and receiver operate in different threads, but asynchronously dropping the message in a queue is not appropriate. In other words, with a `RendezvousChannel`, the sender knows that some receiver has accepted the message, whereas with a `QueueChannel`, the message would have been stored to the internal queue and potentially never received. | |Keep in mind that all of these queue-based channels are storing messages in-memory only by default.
When persistence is required, you can either provide a 'message-store' attribute within the 'queue' element to reference a persistent `MessageStore` implementation or you can replace the local channel with one that is backed by a persistent broker, such as a JMS-backed channel or channel adapter.
The latter option lets you take advantage of any JMS provider’s implementation for message persistence, as discussed in [JMS Support](./jms.html#jms).
However, when buffering in a queue is not necessary, the simplest approach is to rely upon the `DirectChannel`, discussed in the next section.| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| The `RendezvousChannel` is also useful for implementing request-reply operations. The sender can create a temporary, anonymous instance of `RendezvousChannel`, which it then sets as the 'replyChannel' header when building a `Message`. After sending that `Message`, the sender can immediately call `receive` (optionally providing a timeout value) in order to block while waiting for a reply `Message`. This is very similar to the implementation used internally by many of Spring Integration’s request-reply components. ##### `DirectChannel` The `DirectChannel` has point-to-point semantics but otherwise is more similar to the `PublishSubscribeChannel` than any of the queue-based channel implementations described earlier. It implements the `SubscribableChannel` interface instead of the `PollableChannel` interface, so it dispatches messages directly to a subscriber. As a point-to-point channel, however, it differs from the `PublishSubscribeChannel` in that it sends each `Message` to a single subscribed `MessageHandler`. In addition to being the simplest point-to-point channel option, one of its most important features is that it enables a single thread to perform the operations on “both sides” of the channel. For example, if a handler subscribes to a `DirectChannel`, then sending a `Message` to that channel triggers invocation of that handler’s `handleMessage(Message)` method directly in the sender’s thread, before the `send()` method invocation can return. The key motivation for providing a channel implementation with this behavior is to support transactions that must span across the channel while still benefiting from the abstraction and loose coupling that the channel provides. If the send call is invoked within the scope of a transaction, the outcome of the handler’s invocation (for example, updating a database record) plays a role in determining the ultimate result of that transaction (commit or rollback). | |Since the `DirectChannel` is the simplest option and does not add any additional overhead that would be required for scheduling and managing the threads of a poller, it is the default channel type within Spring Integration.
The general idea is to define the channels for an application, consider which of those need to provide buffering or to throttle input, and modify those to be queue-based `PollableChannels`.
Likewise, if a channel needs to broadcast messages, it should not be a `DirectChannel` but rather a `PublishSubscribeChannel`.
Later, we show how each of these channels can be configured.| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| The `DirectChannel` internally delegates to a message dispatcher to invoke its subscribed message handlers, and that dispatcher can have a load-balancing strategy exposed by `load-balancer` or `load-balancer-ref` attributes (mutually exclusive). The load balancing strategy is used by the message dispatcher to help determine how messages are distributed amongst message handlers when multiple message handlers subscribe to the same channel. As a convenience, the `load-balancer` attribute exposes an enumeration of values pointing to pre-existing implementations of `LoadBalancingStrategy`. A `round-robin` (load-balances across the handlers in rotation) and `none` (for the cases where one wants to explicitly disable load balancing) are the only available values. Other strategy implementations may be added in future versions. However, since version 3.0, you can provide your own implementation of the `LoadBalancingStrategy` and inject it by using the `load-balancer-ref` attribute, which should point to a bean that implements `LoadBalancingStrategy`, as the following example shows: A `FixedSubscriberChannel` is a `SubscribableChannel` that only supports a single `MessageHandler` subscriber that cannot be unsubscribed. This is useful for high-throughput performance use-cases when no other subscribers are involved and no channel interceptors are needed. ``` ``` Note that the `load-balancer` and `load-balancer-ref` attributes are mutually exclusive. The load-balancing also works in conjunction with a boolean `failover` property. If the `failover` value is true (the default), the dispatcher falls back to any subsequent handlers (as necessary) when preceding handlers throw exceptions. The order is determined by an optional order value defined on the handlers themselves or, if no such value exists, the order in which the handlers subscribed. If a certain situation requires that the dispatcher always try to invoke the first handler and then fall back in the same fixed order sequence every time an error occurs, no load-balancing strategy should be provided. In other words, the dispatcher still supports the `failover` boolean property even when no load-balancing is enabled. Without load-balancing, however, the invocation of handlers always begins with the first, according to their order. For example, this approach works well when there is a clear definition of primary, secondary, tertiary, and so on. When using the namespace support, the `order` attribute on any endpoint determines the order. | |Keep in mind that load-balancing and `failover` apply only when a channel has more than one subscribed message handler.
When using the namespace support, this means that more than one endpoint shares the same channel reference defined in the `input-channel` attribute.| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| Starting with version 5.2, when `failover` is true, a failure of the current handler together with the failed message is logged under `debug` or `info` if configured respectively. ##### `ExecutorChannel` The `ExecutorChannel` is a point-to-point channel that supports the same dispatcher configuration as `DirectChannel` (load-balancing strategy and the `failover` boolean property). The key difference between these two dispatching channel types is that the `ExecutorChannel` delegates to an instance of `TaskExecutor` to perform the dispatch. This means that the send method typically does not block, but it also means that the handler invocation may not occur in the sender’s thread. It therefore does not support transactions that span the sender and receiving handler. | |The sender can sometimes block.
For example, when using a `TaskExecutor` with a rejection policy that throttles the client (such as the `ThreadPoolExecutor.CallerRunsPolicy`), the sender’s thread can execute the method any time the thread pool is at its maximum capacity and the executor’s work queue is full.
Since that situation would only occur in a non-predictable way, you should not rely upon it for transactions.| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ##### `FluxMessageChannel` The `FluxMessageChannel` is an `org.reactivestreams.Publisher` implementation for `"sinking"` sent messages into an internal `reactor.core.publisher.Flux` for on demand consumption by reactive subscribers downstream. This channel implementation is neither a `SubscribableChannel`, nor a `PollableChannel`, so only `org.reactivestreams.Subscriber` instances can be used to consume from this channel honoring back-pressure nature of reactive streams. On the other hand, the `FluxMessageChannel` implements a `ReactiveStreamsSubscribableChannel` with its `subscribeTo(Publisher>)` contract allowing receiving events from reactive source publishers, bridging a reactive stream into the integration flow. To achieve fully reactive behavior for the whole integration flow, such a channel must be placed between all the endpoints in the flow. See [Reactive Streams Support](./reactive-streams.html#reactive-streams) for more information about interaction with Reactive Streams. ##### Scoped Channel Spring Integration 1.0 provided a `ThreadLocalChannel` implementation, but that has been removed as of 2.0. Now the more general way to handle the same requirement is to add a `scope` attribute to a channel. The value of the attribute can be the name of a scope that is available within the context. For example, in a web environment, certain scopes are available, and any custom scope implementations can be registered with the context. The following example shows a thread-local scope being applied to a channel, including the registration of the scope itself: ``` ``` The channel defined in the previous example also delegates to a queue internally, but the channel is bound to the current thread, so the contents of the queue are similarly bound. That way, the thread that sends to the channel can later receive those same messages, but no other thread would be able to access them. While thread-scoped channels are rarely needed, they can be useful in situations where `DirectChannel` instances are being used to enforce a single thread of operation but any reply messages should be sent to a “terminal” channel. If that terminal channel is thread-scoped, the original sending thread can collect its replies from the terminal channel. Now, since any channel can be scoped, you can define your own scopes in addition to thread-Local. #### Channel Interceptors One of the advantages of a messaging architecture is the ability to provide common behavior and capture meaningful information about the messages passing through the system in a non-invasive way. Since the `Message` instances are sent to and received from `MessageChannel` instances, those channels provide an opportunity for intercepting the send and receive operations. The `ChannelInterceptor` strategy interface, shown in the following listing, provides methods for each of those operations: ``` public interface ChannelInterceptor { Message preSend(Message message, MessageChannel channel); void postSend(Message message, MessageChannel channel, boolean sent); void afterSendCompletion(Message message, MessageChannel channel, boolean sent, Exception ex); boolean preReceive(MessageChannel channel); Message postReceive(Message message, MessageChannel channel); void afterReceiveCompletion(Message message, MessageChannel channel, Exception ex); } ``` After implementing the interface, registering the interceptor with a channel is just a matter of making the following call: ``` channel.addInterceptor(someChannelInterceptor); ``` The methods that return a `Message` instance can be used for transforming the `Message` or can return 'null' to prevent further processing (of course, any of the methods can throw a `RuntimeException`). Also, the `preReceive` method can return `false` to prevent the receive operation from proceeding. | |Keep in mind that `receive()` calls are only relevant for `PollableChannels`.
In fact, the `SubscribableChannel` interface does not even define a `receive()` method.
The reason for this is that when a `Message` is sent to a `SubscribableChannel`, it is sent directly to zero or more subscribers, depending on the type of channel (for example,
a `PublishSubscribeChannel` sends to all of its subscribers).
Therefore, the `preReceive(…​)`, `postReceive(…​)`, and `afterReceiveCompletion(…​)` interceptor methods are invoked only when the interceptor is applied to a `PollableChannel`.| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| Spring Integration also provides an implementation of the [Wire Tap](https://www.enterpriseintegrationpatterns.com/WireTap.html) pattern. It is a simple interceptor that sends the `Message` to another channel without otherwise altering the existing flow. It can be very useful for debugging and monitoring. An example is shown in [Wire Tap](#channel-wiretap). Because it is rarely necessary to implement all of the interceptor methods, the interface provides no-op methods (methods returning `void` method have no code, the `Message`-returning methods return the `Message` as-is, and the `boolean` method returns `true`). | |The order of invocation for the interceptor methods depends on the type of channel.
As described earlier, the queue-based channels are the only ones where the receive method is intercepted in the first place.
Additionally, the relationship between send and receive interception depends on the timing of the separate sender and receiver threads.
For example, if a receiver is already blocked while waiting for a message, the order could be as follows: `preSend`, `preReceive`, `postReceive`, `postSend`.
However, if a receiver polls after the sender has placed a message on the channel and has already returned, the order would be as follows: `preSend`, `postSend` (some-time-elapses), `preReceive`, `postReceive`.
The time that elapses in such a case depends on a number of factors and is therefore generally unpredictable (in fact, the receive may never happen).
The type of queue also plays a role (for example, rendezvous versus priority).
In short, you cannot rely on the order beyond the fact that `preSend` precedes `postSend` and `preReceive` precedes `postReceive`.| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| Starting with Spring Framework 4.1 and Spring Integration 4.1, the `ChannelInterceptor` provides new methods: `afterSendCompletion()` and `afterReceiveCompletion()`. They are invoked after `send()' and 'receive()` calls, regardless of any exception that is raised, which allow for resource cleanup. Note that the channel invokes these methods on the `ChannelInterceptor` list in the reverse order of the initial `preSend()` and `preReceive()` calls. Starting with version 5.1, global channel interceptors now apply to dynamically registered channels - such as through beans that are initialized by using `beanFactory.initializeBean()` or `IntegrationFlowContext` when using the Java DSL. Previously, interceptors were not applied when beans were created after the application context was refreshed. Also, starting with version 5.1, `ChannelInterceptor.postReceive()` is no longer called when no message is received; it is no longer necessary to check for a `null` `Message`. Previously, the method was called. If you have an interceptor that relies on the previous behavior, implement `afterReceiveCompleted()` instead, since that method is invoked, regardless of whether a message is received or not. | |Starting with version 5.2, the `ChannelInterceptorAware` is deprecated in favor of `InterceptableChannel` from the Spring Messaging module, which it extends now for backward compatibility.| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### `MessagingTemplate` When the endpoints and their various configuration options are introduced, Spring Integration provides a foundation for messaging components that enables non-invasive invocation of your application code from the messaging system. However, it is sometimes necessary to invoke the messaging system from your application code. For convenience when implementing such use cases, Spring Integration provides a `MessagingTemplate` that supports a variety of operations across the message channels, including request and reply scenarios. For example, it is possible to send a request and wait for a reply, as follows: ``` MessagingTemplate template = new MessagingTemplate(); Message reply = template.sendAndReceive(someChannel, new GenericMessage("test")); ``` In the preceding example, a temporary anonymous channel would be created internally by the template. The 'sendTimeout' and 'receiveTimeout' properties may also be set on the template, and other exchange types are also supported. The following listing shows the signatures for such methods: ``` public boolean send(final MessageChannel channel, final Message message) { ... } public Message sendAndReceive(final MessageChannel channel, final Message request) { ... } public Message receive(final PollableChannel channel) { ... } ``` | |A less invasive approach that lets you invoke simple interfaces with payload or header values instead of `Message` instances is described in [Enter the `GatewayProxyFactoryBean`](./gateway.html#gateway-proxy).| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### Configuring Message Channels To create a message channel instance, you can use the `` element for xml or `DirectChannel` instance for Java configuration, as follows: Java ``` @Bean public MessageChannel exampleChannel() { return new DirectChannel(); } ``` XML ``` ``` When you use the `` element without any sub-elements, it creates a `DirectChannel` instance (a `SubscribableChannel`). To create a publish-subscribe channel, use the `` element (the `PublishSubscribeChannel` in Java), as follows: Java ``` @Bean public MessageChannel exampleChannel() { return new PublishSubscribeChannel(); } ``` XML ``` ``` You can alternatively provide a variety of `` sub-elements to create any of the pollable channel types (as described in [Message Channel Implementations](#channel-implementations)). The following sections shows examples of each channel type. ##### `DirectChannel` Configuration As mentioned earlier, `DirectChannel` is the default type. The following listing shows who to define one: Java ``` @Bean public MessageChannel directChannel() { return new DirectChannel(); } ``` XML ``` ``` A default channel has a round-robin load-balancer and also has failover enabled (see [`DirectChannel`](#channel-implementations-directchannel) for more detail). To disable one or both of these, add a `` sub-element (a `LoadBalancingStrategy` constructor of the `DirectChannel`) and configure the attributes as follows: Java ``` @Bean public MessageChannel failFastChannel() { DirectChannel channel = new DirectChannel(); channel.setFailover(false); return channel; } @Bean public MessageChannel failFastChannel() { return new DirectChannel(null); } ``` XML ``` ``` ##### Datatype Channel Configuration Sometimes, a consumer can process only a particular type of payload, forcing you to ensure the payload type of the input messages. The first thing that comes to mind may be to use a message filter. However, all that message filter can do is filter out messages that are not compliant with the requirements of the consumer. Another way would be to use a content-based router and route messages with non-compliant data-types to specific transformers to enforce transformation and conversion to the required data type. This would work, but a simpler way to accomplish the same thing is to apply the [Datatype Channel](https://www.enterpriseintegrationpatterns.com/DatatypeChannel.html) pattern. You can use separate datatype channels for each specific payload data type. To create a datatype channel that accepts only messages that contain a certain payload type, provide the data type’s fully-qualified class name in the channel element’s `datatype` attribute, as the following example shows: Java ``` @Bean public MessageChannel numberChannel() { DirectChannel channel = new DirectChannel(); channel.setDatatypes(Number.class); return channel; } ``` XML ``` ``` Note that the type check passes for any type that is assignable to the channel’s datatype. In other words, the `numberChannel` in the preceding example would accept messages whose payload is `java.lang.Integer` or `java.lang.Double`. Multiple types can be provided as a comma-delimited list, as the following example shows: Java ``` @Bean public MessageChannel numberChannel() { DirectChannel channel = new DirectChannel(); channel.setDatatypes(String.class, Number.class); return channel; } ``` XML ``` ``` So the 'numberChannel' in the preceding example accepts only messages with a data type of `java.lang.Number`. But what happens if the payload of the message is not of the required type? It depends on whether you have defined a bean named `integrationConversionService` that is an instance of Spring’s [Conversion Service](https://docs.spring.io/spring/docs/current/spring-framework-reference/html/validation.html#core-convert-ConversionService-API). If not, then an `Exception` would be thrown immediately. However, if you have defined an `integrationConversionService` bean, it is used in an attempt to convert the message’s payload to the acceptable type. You can even register custom converters. For example, suppose you send a message with a `String` payload to the 'numberChannel' we configured above. You might handle the message as follows: ``` MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class); inChannel.send(new GenericMessage("5")); ``` Typically this would be a perfectly legal operation. However, since we use Datatype Channel, the result of such operation would generate an exception similar to the following: ``` Exception in thread "main" org.springframework.integration.MessageDeliveryException: Channel 'numberChannel' expected one of the following datataypes [class java.lang.Number], but received [class java.lang.String] … ``` The exception happens because we require the payload type to be a `Number`, but we sent a `String`. So we need something to convert a `String` to a `Number`. For that, we can implement a converter similar to the following example: ``` public static class StringToIntegerConverter implements Converter { public Integer convert(String source) { return Integer.parseInt(source); } } ``` Then we can register it as a converter with the Integration Conversion Service, as the following example shows: Java ``` @Bean @IntegrationConverter public StringToIntegerConverter strToInt { return new StringToIntegerConverter(); } ``` XML ``` ``` Or on the `StringToIntegerConverter` class when it is marked with the `@Component` annotation for auto-scanning. When the 'converter' element is parsed, it creates the `integrationConversionService` bean if one is not already defined. With that converter in place, the `send` operation would now be successful, because the datatype channel uses that converter to convert the `String` payload to an `Integer`. For more information regarding payload type conversion, see [Payload Type Conversion](./endpoint.html#payload-type-conversion). Beginning with version 4.0, the `integrationConversionService` is invoked by the `DefaultDatatypeChannelMessageConverter`, which looks up the conversion service in the application context. To use a different conversion technique, you can specify the `message-converter` attribute on the channel. This must be a reference to a `MessageConverter` implementation. Only the `fromMessage` method is used. It provides the converter with access to the message headers (in case the conversion might need information from the headers, such as `content-type`). The method can return only the converted payload or a full `Message` object. If the latter, the converter must be careful to copy all the headers from the inbound message. Alternatively, you can declare a `` of type `MessageConverter` with an ID of `datatypeChannelMessageConverter`, and that converter is used by all channels with a `datatype`. ##### `QueueChannel` Configuration To create a `QueueChannel`, use the `` sub-element. You may specify the channel’s capacity as follows: Java ``` @Bean public PollableChannel queueChannel() { return new QueueChannel(25); } ``` XML ``` ``` | |If you do not provide a value for the 'capacity' attribute on this `` sub-element, the resulting queue is unbounded.
To avoid issues such as running out of memory, we highly recommend that you set an explicit value for a bounded queue.| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ###### Persistent `QueueChannel` Configuration Since a `QueueChannel` provides the capability to buffer messages but does so in-memory only by default, it also introduces a possibility that messages could be lost in the event of a system failure. To mitigate this risk, a `QueueChannel` may be backed by a persistent implementation of the `MessageGroupStore` strategy interface. For more details on `MessageGroupStore` and `MessageStore`, see [Message Store](./message-store.html#message-store). | |The `capacity` attribute is not allowed when the `message-store` attribute is used.| |---|-----------------------------------------------------------------------------------| When a `QueueChannel` receives a `Message`, it adds the message to the message store. When a `Message` is polled from a `QueueChannel`, it is removed from the message store. By default, a `QueueChannel` stores its messages in an in-memory queue, which can lead to the lost message scenario mentioned earlier. However, Spring Integration provides persistent stores, such as the `JdbcChannelMessageStore`. You can configure a message store for any `QueueChannel` by adding the `message-store` attribute, as the following example shows: ``` ``` (See samples below for Java/Kotlin Configuration options.) The Spring Integration JDBC module also provides a schema Data Definition Language (DDL) for a number of popular databases. These schemas are located in the org.springframework.integration.jdbc.store.channel package of that module (`spring-integration-jdbc`). | |One important feature is that, with any transactional persistent store (such as `JdbcChannelMessageStore`), as long as the poller has a transaction configured, a message removed from the store can be permanently removed only if the transaction completes successfully.
Otherwise the transaction rolls back, and the `Message` is not lost.| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| Many other implementations of the message store are available as the growing number of Spring projects related to “NoSQL” data stores come to provide underlying support for these stores. You can also provide your own implementation of the `MessageGroupStore` interface if you cannot find one that meets your particular needs. Since version 4.0, we recommend that `QueueChannel` instances be configured to use a `ChannelMessageStore`, if possible. These are generally optimized for this use, as compared to a general message store. If the `ChannelMessageStore` is a `ChannelPriorityMessageStore`, the messages are received in FIFO within priority order. The notion of priority is determined by the message store implementation. For example, the following example shows the Java configuration for the [MongoDB Channel Message Store](./mongodb.html#mongodb-priority-channel-message-store): Java ``` @Bean public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) { MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory); store.setPriorityEnabled(true); return store; } @Bean public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) { return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue")); } ``` Java DSL ``` @Bean public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) { return IntegrationFlows.from((Channels c) -> c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup")) .... .get(); } ``` Kotlin DSL ``` @Bean fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) = integrationFlow { channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") } } ``` | |Pay attention to the `MessageGroupQueue` class.
That is a `BlockingQueue` implementation to use the `MessageGroupStore` operations.| |---|---------------------------------------------------------------------------------------------------------------------------------------| Another option to customize the `QueueChannel` environment is provided by the `ref` attribute of the `` sub-element or its particular constructor. This attribute supplies the reference to any `java.util.Queue` implementation. For example, a Hazelcast distributed [`IQueue`](https://hazelcast.com/use-cases/imdg/imdg-messaging/) can be configured as follows: ``` @Bean public HazelcastInstance hazelcastInstance() { return Hazelcast.newHazelcastInstance(new Config() .setProperty("hazelcast.logging.type", "log4j")); } @Bean public PollableChannel distributedQueue() { return new QueueChannel(hazelcastInstance() .getQueue("springIntegrationQueue")); } ``` ##### `PublishSubscribeChannel` Configuration To create a `PublishSubscribeChannel`, use the \ element. When using this element, you can also specify the `task-executor` used for publishing messages (if none is specified, it publishes in the sender’s thread), as follows: Java ``` @Bean public MessageChannel pubsubChannel() { return new PublishSubscribeChannel(someExecutor()); } ``` XML ``` ``` If you provide a resequencer or aggregator downstream from a `PublishSubscribeChannel`, you can set the 'apply-sequence' property on the channel to `true`. Doing so indicates that the channel should set the `sequence-size` and `sequence-number` message headers as well as the correlation ID prior to passing along the messages. For example, if there are five subscribers, the `sequence-size` would be set to `5`, and the messages would have `sequence-number` header values ranging from `1` to `5`. Along with the `Executor`, you can also configure an `ErrorHandler`. By default, the `PublishSubscribeChannel` uses a `MessagePublishingErrorHandler` implementation to send an error to the `MessageChannel` from the `errorChannel` header or into the global `errorChannel` instance. If an `Executor` is not configured, the `ErrorHandler` is ignored and exceptions are thrown directly to the caller’s thread. If you provide a `Resequencer` or `Aggregator` downstream from a `PublishSubscribeChannel`, you can set the 'apply-sequence' property on the channel to `true`. Doing so indicates that the channel should set the sequence-size and sequence-number message headers as well as the correlation ID prior to passing along the messages. For example, if there are five subscribers, the sequence-size would be set to `5`, and the messages would have sequence-number header values ranging from `1` to `5`. The following example shows how to set the `apply-sequence` header to `true`: Java ``` @Bean public MessageChannel pubsubChannel() { PublishSubscribeChannel channel = new PublishSubscribeChannel(); channel.setApplySequence(false); return channel; } ``` XML ``` ``` | |The `apply-sequence` value is `false` by default so that a publish-subscribe channel can send the exact same message instances to multiple outbound channels.
Since Spring Integration enforces immutability of the payload and header references, when the flag is set to `true`, the channel creates new `Message` instances with the same payload reference but different header values.| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| Starting with version 5.4.3, the `PublishSubscribeChannel` can also be configured with the `requireSubscribers` option of its `BroadcastingDispatcher` to indicate that this channel will not ignore a message silently when it has no subscribers. A `MessageDispatchingException` with a `Dispatcher has no subscribers` message is thrown when there are no subscribers and this option is set to `true`. ##### `ExecutorChannel` To create an `ExecutorChannel`, add the `` sub-element with a `task-executor` attribute. The attribute’s value can reference any `TaskExecutor` within the context. For example, doing so enables configuration of a thread pool for dispatching messages to subscribed handlers. As mentioned earlier, doing so breaks the single-threaded execution context between sender and receiver so that any active transaction context is not shared by the invocation of the handler (that is, the handler may throw an `Exception`, but the `send` invocation has already returned successfully). The following example shows how to use the `dispatcher` element and specify an executor in the `task-executor` attribute: Java ``` @Bean public MessageChannel executorChannel() { return new ExecutorChannel(someExecutor()); } ``` XML ``` ``` | |The `load-balancer` and `failover` options are also both available on the \ sub-element, as described earlier in [`DirectChannel` Configuration](#channel-configuration-directchannel).
The same defaults apply.
Consequently, the channel has a round-robin load-balancing strategy with failover enabled unless explicit configuration is provided for one or both of those attributes, as the following example shows:

```



```| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ##### `PriorityChannel` Configuration To create a `PriorityChannel`, use the `` sub-element, as the following example shows: Java ``` @Bean public PollableChannel priorityChannel() { return new PriorityChannel(20); } ``` XML ``` ``` By default, the channel consults the `priority` header of the message. However, you can instead provide a custom `Comparator` reference. Also, note that the `PriorityChannel` (like the other types) does support the `datatype` attribute. As with the `QueueChannel`, it also supports a `capacity` attribute. The following example demonstrates all of these: Java ``` @Bean public PollableChannel priorityChannel() { PriorityChannel channel = new PriorityChannel(20, widgetComparator()); channel.setDatatypes(example.Widget.class); return channel; } ``` XML ``` ``` Since version 4.0, the `priority-channel` child element supports the `message-store` option (`comparator` and `capacity` are not allowed in that case). The message store must be a `PriorityCapableChannelMessageStore`. Implementations of the `PriorityCapableChannelMessageStore` are currently provided for `Redis`, `JDBC`, and `MongoDB`. See [`QueueChannel` Configuration](#channel-configuration-queuechannel) and [Message Store](./message-store.html#message-store) for more information. You can find sample configuration in [Backing Message Channels](./jdbc.html#jdbc-message-store-channels). ##### `RendezvousChannel` Configuration A `RendezvousChannel` is created when the queue sub-element is a ``. It does not provide any additional configuration options to those described earlier, and its queue does not accept any capacity value, since it is a zero-capacity direct handoff queue. The following example shows how to declare a `RendezvousChannel`: Java ``` @Bean public PollableChannel rendezvousChannel() { return new RendezvousChannel(); } ``` XML ``` ``` ##### Scoped Channel Configuration Any channel can be configured with a `scope` attribute, as the following example shows: ``` ``` ##### Channel Interceptor Configuration Message channels may also have interceptors, as described in [Channel Interceptors](#channel-interceptors). The `` sub-element can be added to a `` (or the more specific element types). You can provide the `ref` attribute to reference any Spring-managed object that implements the `ChannelInterceptor` interface, as the following example shows: ``` ``` In general, we recommend defining the interceptor implementations in a separate location, since they usually provide common behavior that can be reused across multiple channels. ##### Global Channel Interceptor Configuration Channel interceptors provide a clean and concise way of applying cross-cutting behavior per individual channel. If the same behavior should be applied on multiple channels, configuring the same set of interceptors for each channel would not be the most efficient way. To avoid repeated configuration while also enabling interceptors to apply to multiple channels, Spring Integration provides global interceptors. Consider the following pair of examples: ``` ``` ``` ``` Each `` element lets you define a global interceptor, which is applied on all channels that match any patterns defined by the `pattern` attribute. In the preceding case, the global interceptor is applied on the 'thing1' channel and all other channels that begin with 'thing2' or 'input' but not to channels starting with 'thing3' (since version 5.0). | |The addition of this syntax to the pattern causes one possible (though perhaps unlikely) problem.
If you have a bean named `!thing1` and you included a pattern of `!thing1` in your channel interceptor’s `pattern` patterns, it no longer matches.
The pattern now matches all beans not named `thing1`.
In this case, you can escape the `!` in the pattern with `\`.
The pattern `\!thing1` matches a bean named `!thing1`.| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| The order attribute lets you manage where this interceptor is injected when there are multiple interceptors on a given channel. For example, channel 'inputChannel' could have individual interceptors configured locally (see below), as the following example shows: ``` ``` A reasonable question is “how is a global interceptor injected in relation to other interceptors configured locally or through other global interceptor definitions?” The current implementation provides a simple mechanism for defining the order of interceptor execution. A positive number in the `order` attribute ensures interceptor injection after any existing interceptors, while a negative number ensures that the interceptor is injected before existing interceptors. This means that, in the preceding example, the global interceptor is injected after (since its `order` is greater than `0`) the 'wire-tap' interceptor configured locally. If there were another global interceptor with a matching `pattern`, its order would be determined by comparing the values of both interceptors' `order` attributes. To inject a global interceptor before the existing interceptors, use a negative value for the `order` attribute. | |Note that both the `order` and `pattern` attributes are optional.
The default value for `order` will be 0 and for `pattern`, the default is '\*' (to match all channels).| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ##### Wire Tap As mentioned earlier, Spring Integration provides a simple wire tap interceptor. You can configure a wire tap on any channel within an `` element. Doing so is especially useful for debugging and can be used in conjunction with Spring Integration’s logging channel adapter as follows: ``` ``` | |The 'logging-channel-adapter' also accepts an 'expression' attribute so that you can evaluate a SpEL expression against the 'payload' and 'headers' variables.
Alternatively, to log the full message `toString()` result, provide a value of `true` for the 'log-full-message' attribute.
By default, it is `false` so that only the payload is logged.
Setting it to `true` enables logging of all headers in addition to the payload.
The 'expression' option provides the most flexibility (for example, `expression="payload.user.name"`).| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| One of the common misconceptions about the wire tap and other similar components ([Message Publishing Configuration](./message-publishing.html#message-publishing-config)) is that they are automatically asynchronous in nature. By default, wire tap as a component is not invoked asynchronously. Instead, Spring Integration focuses on a single unified approach to configuring asynchronous behavior: the message channel. What makes certain parts of the message flow synchronous or asynchronous is the type of Message Channel that has been configured within that flow. That is one of the primary benefits of the message channel abstraction. From the inception of the framework, we have always emphasized the need and the value of the message channel as a first-class citizen of the framework. It is not just an internal, implicit realization of the EIP pattern. It is fully exposed as a configurable component to the end user. So, the wire tap component is only responsible for performing the following tasks: * Intercept a message flow by tapping into a channel (for example, `channelA`) * Grab each message * Send the message to another channel (for example, `channelB`) It is essentially a variation of the bridge pattern, but it is encapsulated within a channel definition (and hence easier to enable and disable without disrupting a flow). Also, unlike the bridge, it basically forks another message flow. Is that flow synchronous or asynchronous? The answer depends on the type of message channel that 'channelB' is. We have the following options: direct channel, pollable channel, and executor channel. The last two break the thread boundary, making communication over such channels asynchronous, because the dispatching of the message from that channel to its subscribed handlers happens on a different thread than the one used to send the message to that channel. That is what is going to make your wire-tap flow synchronous or asynchronous. It is consistent with other components within the framework (such as message publisher) and adds a level of consistency and simplicity by sparing you from worrying in advance (other than writing thread-safe code) about whether a particular piece of code should be implemented as synchronous or asynchronous. The actual wiring of two pieces of code (say, component A and component B) over a message channel is what makes their collaboration synchronous or asynchronous. You may even want to change from synchronous to asynchronous in the future, and message channel lets you to do it swiftly without ever touching the code. One final point regarding the wire tap is that, despite the rationale provided above for not being asynchronous by default, you should keep in mind that it is usually desirable to hand off the message as soon as possible. Therefore, it would be quite common to use an asynchronous channel option as the wire tap’s outbound channel. However we doe not enforce asynchronous behavior by default. There are a number of use cases that would break if we did, including that you might not want to break a transactional boundary. Perhaps you use the wire tap pattern for auditing purposes, and you do want the audit messages to be sent within the original transaction. As an example, you might connect the wire tap to a JMS outbound channel adapter. That way, you get the best of both worlds: 1) the sending of a JMS Message can occur within the transaction while 2) it is still a “fire-and-forget” action, thereby preventing any noticeable delay in the main message flow. | |Starting with version 4.0, it is important to avoid circular references when an interceptor (such as the [`WireTap` class](https://docs.spring.io/autorepo/docs/spring-integration/current/api/org/springframework/integration/channel/interceptor/WireTap.html)) references a channel.
You need to exclude such channels from those being intercepted by the current interceptor.
This can be done with appropriate patterns or programmatically.
If you have a custom `ChannelInterceptor` that references a `channel`, consider implementing `VetoCapableInterceptor`.
That way, the framework asks the interceptor if it is OK to intercept each channel that is a candidate, based on the supplied pattern.
You can also add runtime protection in the interceptor methods to ensure that the channel is not one that is referenced by the interceptor.
The `WireTap` uses both of these techniques.| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| Starting with version 4.3, the `WireTap` has additional constructors that take a `channelName` instead of a`MessageChannel` instance. This can be convenient for Java configuration and when channel auto-creation logic is being used. The target `MessageChannel` bean is resolved from the provided `channelName` later, on the first interaction with the interceptor. | |Channel resolution requires a `BeanFactory`, so the wire tap instance must be a Spring-managed bean.| |---|----------------------------------------------------------------------------------------------------| This late-binding approach also allows simplification of typical wire-tapping patterns with Java DSL configuration, as the following example shows: ``` @Bean public PollableChannel myChannel() { return MessageChannels.queue() .wireTap("loggingFlow.input") .get(); } @Bean public IntegrationFlow loggingFlow() { return f -> f.log(); } ``` ##### Conditional Wire Taps Wire taps can be made conditional by using the `selector` or `selector-expression` attributes. The `selector` references a `MessageSelector` bean, which can determine at runtime whether the message should go to the tap channel. Similarly, the `selector-expression` is a boolean SpEL expression that performs the same purpose: If the expression evaluates to `true`, the message is sent to the tap channel. ##### Global Wire Tap Configuration It is possible to configure a global wire tap as a special case of the [Global Channel Interceptor Configuration](#global-channel-configuration-interceptors). To do so, configure a top level `wire-tap` element. Now, in addition to the normal `wire-tap` namespace support, the `pattern` and `order` attributes are supported and work in exactly the same way as they do for the `channel-interceptor`. The following example shows how to configure a global wire tap: Java ``` @Bean @GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3) public WireTap wireTap(MessageChannel wiretapChannel) { return new WireTap(wiretapChannel); } ``` XML ``` ``` | |A global wire tap provides a convenient way to configure a single-channel wire tap externally without modifying the existing channel configuration.
To do so, set the `pattern` attribute to the target channel name.
For example, you can use this technique to configure a test case to verify messages on a channel.| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### Special Channels Two special channels are defined within the application context by default: `errorChannel` and `nullChannel`. The 'nullChannel' (an instance of `NullChannel`) acts like `/dev/null`, logging any message sent to it at the `DEBUG` level and returning immediately. The special treatment is applied for an `org.reactivestreams.Publisher` payload of a sent message: it is subscribed to in this channel immediately, to initiate reactive stream processing, although the data is discarded. An error thrown from a reactive stream processing (see `Subscriber.onError(Throwable)`) is logged under the warn level for possible investigation. If there is need to do anything with such an error, the `[ReactiveRequestHandlerAdvice](./handler-advice.html#reactive-advice)` with a `Mono.doOnError()` customization can be applied to the message handler producing `Mono` reply into this `nullChannel`. Any time you face channel resolution errors for a reply that you do not care about, you can set the affected component’s `output-channel` attribute to 'nullChannel' (the name, 'nullChannel', is reserved within the application context). The 'errorChannel' is used internally for sending error messages and may be overridden with a custom configuration. This is discussed in greater detail in [Error Handling](./error-handling.html#error-handling). See also [Message Channels](./dsl.html#java-dsl-channels) in the Java DSL chapter for more information about message channel and interceptors. ### Poller This section describes how polling works in Spring Integration. #### Polling Consumer When Message Endpoints (Channel Adapters) are connected to channels and instantiated, they produce one of the following instances: * [`PollingConsumer`](https://docs.spring.io/spring-integration/api/org/springframework/integration/endpoint/PollingConsumer.html) * [`EventDrivenConsumer`](https://docs.spring.io/spring-integration/api/org/springframework/integration/endpoint/EventDrivenConsumer.html) The actual implementation depends on the type of channel to which these endpoints connect. A channel adapter connected to a channel that implements the [`org.springframework.messaging.SubscribableChannel`](https://docs.spring.io/spring/docs/current/javadoc-api/index.html?org/springframework/messaging/SubscribableChannel.html) interface produces an instance of `EventDrivenConsumer`. On the other hand, a channel adapter connected to a channel that implements the [`org.springframework.messaging.PollableChannel`](https://docs.spring.io/spring/docs/current/javadoc-api/index.html?org/springframework/messaging/PollableChannel.html) interface (such as a `QueueChannel`) produces an instance of `PollingConsumer`. Polling consumers let Spring Integration components actively poll for Messages rather than process messages in an event-driven manner. They represent a critical cross-cutting concern in many messaging scenarios. In Spring Integration, polling consumers are based on the pattern with the same name, which is described in the book *Enterprise Integration Patterns*, by Gregor Hohpe and Bobby Woolf. You can find a description of the pattern on the [book’s website](https://www.enterpriseintegrationpatterns.com/PollingConsumer.html). #### Pollable Message Source Spring Integration offers a second variation of the polling consumer pattern. When inbound channel adapters are used, these adapters are often wrapped by a `SourcePollingChannelAdapter`. For example, when retrieving messages from a remote FTP Server location, the adapter described in [FTP Inbound Channel Adapter](./ftp.html#ftp-inbound) is configured with a poller to periodically retrieve messages. So, when components are configured with pollers, the resulting instances are of one of the following types: * [`PollingConsumer`](https://docs.spring.io/spring-integration/api/org/springframework/integration/endpoint/PollingConsumer.html) * [`SourcePollingChannelAdapter`](https://docs.spring.io/spring-integration/api/org/springframework/integration/endpoint/SourcePollingChannelAdapter.html) This means that pollers are used in both inbound and outbound messaging scenarios. Here are some use cases in which pollers are used: * Polling certain external systems, such as FTP Servers, Databases, and Web Services * Polling internal (pollable) message channels * Polling internal services (such as repeatedly executing methods on a Java class) | |AOP advice classes can be applied to pollers, in an `advice-chain`, such as a transaction advice to start a transaction.
Starting with version 4.1, a `PollSkipAdvice` is provided.
Pollers use triggers to determine the time of the next poll.
The `PollSkipAdvice` can be used to suppress (skip) a poll, perhaps because there is some downstream condition that would prevent the message being processed.
To use this advice, you have to provide it with an implementation of a `PollSkipStrategy`.
Starting with version 4.2.5, a `SimplePollSkipStrategy` is provided.
To use it, you can add an instance as a bean to the application context, inject it into a `PollSkipAdvice`, and add that to the poller’s advice chain.
To skip polling, call `skipPolls()`.
To resume polling, call `reset()`.
Version 4.2 added more flexibility in this area.
See [Conditional Pollers for Message Sources](#conditional-pollers).| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| This chapter is meant to only give a high-level overview of polling consumers and how they fit into the concept of message channels (see [Message Channels](./channel.html#channel)) and channel adapters (see [Channel Adapter](./channel-adapter.html#channel-adapter)). For more information regarding messaging endpoints in general and polling consumers in particular, see [Message Endpoints](./endpoint.html#endpoint). #### Deferred Acknowledgment Pollable Message Source Starting with version 5.0.1, certain modules provide `MessageSource` implementations that support deferring acknowledgment until the downstream flow completes (or hands off the message to another thread). This is currently limited to the `AmqpMessageSource` and the `KafkaMessageSource`. With these message sources, the `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` header (see [`MessageHeaderAccessor` API](./message.html#message-header-accessor)) is added to the message. When used with pollable message sources, the value of the header is an instance of `AcknowledgmentCallback`, as the following example shows: ``` @FunctionalInterface public interface AcknowledgmentCallback { void acknowledge(Status status); boolean isAcknowledged(); void noAutoAck(); default boolean isAutoAck(); enum Status { /** * Mark the message as accepted. */ ACCEPT, /** * Mark the message as rejected. */ REJECT, /** * Reject the message and requeue so that it will be redelivered. */ REQUEUE } } ``` Not all message sources (for example, a `KafkaMessageSource`) support the `REJECT` status. It is treated the same as `ACCEPT`. Applications can acknowledge a message at any time, as the following example shows: ``` Message received = source.receive(); ... StaticMessageHeaderAccessor.getAcknowledgmentCallback(received) .acknowledge(Status.ACCEPT); ``` If the `MessageSource` is wired into a `SourcePollingChannelAdapter`, when the poller thread returns to the adapter after the downstream flow completes, the adapter checks whether the acknowledgment has already been acknowledged and, if not, sets its status to `ACCEPT` it (or `REJECT` if the flow throws an exception). The status values are defined in the [`AcknowledgmentCallback.Status` enumeration](https://docs.spring.io/spring-integration/api/org/springframework/integration/support/AcknowledgmentCallback.Status.html). Spring Integration provides `MessageSourcePollingTemplate` to perform ad-hoc polling of a `MessageSource`. This, too, takes care of setting `ACCEPT` or `REJECT` on the `AcknowledgmentCallback` when the `MessageHandler` callback returns (or throws an exception). The following example shows how to poll with the `MessageSourcePollingTemplate`: ``` MessageSourcePollingTemplate template = new MessageSourcePollingTemplate(this.source); template.poll(h -> { ... }); ``` In both cases (`SourcePollingChannelAdapter` and `MessageSourcePollingTemplate`), you can disable auto ack/nack by calling `noAutoAck()` on the callback. You might do this if you hand off the message to another thread and wish to acknowledge later. Not all implementations support this (for example, Apache Kafka does not, because the offset commit has to be performed on the same thread). #### Conditional Pollers for Message Sources This section covers how to use conditional pollers. ##### Background `Advice` objects, in an `advice-chain` on a poller, advise the whole polling task (both message retrieval and processing). These “around advice” methods do not have access to any context for the poll — only the poll itself. This is fine for requirements such as making a task transactional or skipping a poll due to some external condition, as discussed earlier. What if we wish to take some action depending on the result of the `receive` part of the poll or if we want to adjust the poller depending on conditions? For those instances, Spring Integration offers “Smart” Polling. ##### “Smart” Polling Version 5.3 introduced the `ReceiveMessageAdvice` interface. (The `AbstractMessageSourceAdvice` has been deprecated in favor of `default` methods in the `MessageSourceMutator`.) Any `Advice` objects in the `advice-chain` that implement this interface are applied only to the receive operation - `MessageSource.receive()` and `PollableChannel.receive(timeout)`. Therefore they can be applied only for the `SourcePollingChannelAdapter` or `PollingConsumer`. Such classes implement the following methods: * `beforeReceive(Object source)`This method is called before the `Object.receive()` method. It lets you examine and reconfigure the source. Returning `false` cancels this poll (similar to the `PollSkipAdvice` mentioned earlier). * `Message afterReceive(Message result, Object source)`This method is called after the `receive()` method. Again, you can reconfigure the source or take any action (perhaps depending on the result, which can be `null` if there was no message created by the source). You can even return a different message | |Thread safety

If an advice mutates the, you should not configure the poller with a `TaskExecutor`.
If an advice mutates the source, such mutations are not thread safe and could cause unexpected results, especially with high frequency pollers.
If you need to process poll results concurrently, consider using a downstream `ExecutorChannel` instead of adding an executor to the poller.| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |Advice Chain Ordering

You should understand how the advice chain is processed during initialization.`Advice` objects that do not implement `ReceiveMessageAdvice` are applied to the whole poll process and are all invoked first, in order, before any `ReceiveMessageAdvice`.
Then `ReceiveMessageAdvice` objects are invoked in order around the source `receive()` method.
If you have, for example, `Advice` objects `a, b, c, d`, where `b` and `d` are `ReceiveMessageAdvice`, the objects are applied in the following order: `a, c, b, d`.
Also, if a source is already a `Proxy`, the `ReceiveMessageAdvice` is invoked after any existing `Advice` objects.
If you wish to change the order, you must wire up the proxy yourself.| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ##### `SimpleActiveIdleReceiveMessageAdvice` (The previous `SimpleActiveIdleMessageSourceAdvice` for only `MessageSource` is deprecated.) This advice is a simple implementation of `ReceiveMessageAdvice`. When used in conjunction with a `DynamicPeriodicTrigger`, it adjusts the polling frequency, depending on whether or not the previous poll resulted in a message or not. The poller must also have a reference to the same `DynamicPeriodicTrigger`. | |Important: Async Handoff

`SimpleActiveIdleReceiveMessageAdvice` modifies the trigger based on the `receive()` result.
This works only if the advice is called on the poller thread.
It does not work if the poller has a `task-executor`.
To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using an `ExecutorChannel`.| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ##### `CompoundTriggerAdvice` This advice allows the selection of one of two triggers based on whether a poll returns a message or not. Consider a poller that uses a `CronTrigger`.`CronTrigger` instances are immutable, so they cannot be altered once constructed. Consider a use case where we want to use a cron expression to trigger a poll once each hour but, if no message is received, poll once per minute and, when a message is retrieved, revert to using the cron expression. The advice (and poller) use a `CompoundTrigger` for this purpose. The trigger’s `primary` trigger can be a `CronTrigger`. When the advice detects that no message is received, it adds the secondary trigger to the `CompoundTrigger`. When the `CompoundTrigger` instance’s `nextExecutionTime` method is invoked, it delegates to the secondary trigger, if present. Otherwise, it delegates to the primary trigger. The poller must also have a reference to the same `CompoundTrigger`. The following example shows the configuration for the hourly cron expression with a fallback to every minute: ``` ``` | |Important: Async Handoff

`CompoundTriggerAdvice` modifies the trigger based on the `receive()` result.
This works only if the advice is called on the poller thread.
It does not work if the poller has a `task-executor`.
To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using an `ExecutorChannel`.| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ##### MessageSource-only Advices Some advices might be applied only for the `MessageSource.receive()` and they don’t make sense for `PollableChannel`. For this purpose a `MessageSourceMutator` interface (an extension of the `ReceiveMessageAdvice`) is still present. With `default` methods it fully replaces already deprecated `AbstractMessageSourceAdvice` and should be used in those implementations where only `MessageSource` proxying is expected. See [Inbound Channel Adapters: Polling Multiple Servers and Directories](./ftp.html#ftp-rotating-server-advice) for more information. ### Channel Adapter A channel adapter is a message endpoint that enables connecting a single sender or receiver to a message channel. Spring Integration provides a number of adapters to support various transports, such as JMS, file, HTTP, web services, mail, and more. Upcoming chapters of this reference guide discuss each adapter. However, this chapter focuses on the simple but flexible method-invoking channel adapter support. There are both inbound and outbound adapters, and each may be configured with XML elements provided in the core namespace. These provide an easy way to extend Spring Integration, as long as you have a method that can be invoked as either a source or a destination. #### Configuring An Inbound Channel Adapter An `inbound-channel-adapter` element (a `SourcePollingChannelAdapter` in Java configuration) can invoke any method on a Spring-managed object and send a non-null return value to a `MessageChannel` after converting the method’s output to a `Message`. When the adapter’s subscription is activated, a poller tries to receive messages from the source. The poller is scheduled with the `TaskScheduler` according to the provided configuration. To configure the polling interval or cron expression for an individual channel adapter, you can provide a 'poller' element with one of the scheduling attributes, such as 'fixed-rate' or 'cron'. The following example defines two `inbound-channel-adapter` instances: Java DSL ``` @Bean public IntegrationFlow source1() { return IntegrationFlows.from(() -> new GenericMessage<>(...), e -> e.poller(p -> p.fixedRate(5000))) ... .get(); } @Bean public IntegrationFlow source2() { return IntegrationFlows.from(() -> new GenericMessage<>(...), e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI"))) ... .get(); } ``` Java ``` public class SourceService { @InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000")) Object method1() { ... } @InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI")) Object method2() { ... } } ``` Kotlin DSL ``` @Bean fun messageSourceFlow() = integrationFlow( { GenericMessage<>(...) }, { poller { it.fixedRate(5000) } }) { ... } ``` XML ``` ``` See also [Channel Adapter Expressions and Scripts](#channel-adapter-expressions-and-scripts). | |If no poller is provided, then a single default poller must be registered within the context.
See [Endpoint Namespace Support](./endpoint.html#endpoint-namespace) for more detail.| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |Important: Poller Configuration

All the `inbound-channel-adapter` types are backed by a `SourcePollingChannelAdapter`, which means they contain a poller configuration that polls the `MessageSource` (to invoke a custom method that produces the value that becomes a `Message` payload) based on the configuration specified in the Poller.
The following example shows the configuration of two pollers:

```



```

In the first configuration, the polling task is invoked once per poll, and, during each task (poll), the method (which results in the production of the message) is invoked once, based on the `max-messages-per-poll` attribute value.
In the second configuration, the polling task is invoked 10 times per poll or until it returns 'null', thus possibly producing ten messages per poll while each poll happens at one-second intervals.
However, what happens if the configuration looks like the following example:

```

```

Note that there is no `max-messages-per-poll` specified.
As we cover later, the identical poller configuration in the `PollingConsumer` (for example, `service-activator`, `filter`, `router`, and others) would have a default value of `-1` for `max-messages-per-poll`, which means “execute the polling task non-stop unless the polling method returns null (perhaps because there are no more messages in the `QueueChannel`)” and then sleep for one second.

However, in the `SourcePollingChannelAdapter`, it is a bit different.
The default value for `max-messages-per-poll` is `1`, unless you explicitly set it to a negative value (such as `-1`).
This makes sure that the poller can react to lifecycle events (such as start and stop) and prevents it from potentially spinning in an infinite loop if the implementation of the custom method of the `MessageSource` has a potential to never return null and happens to be non-interruptible.

However, if you are sure that your method can return null and you need to poll for as many sources as available per each poll, you should explicitly set `max-messages-per-poll` to a negative value, as the following example shows:

```

```

Starting with version 5.5, a `0` value for `max-messages-per-poll` has a special meaning - skip the `MessageSource.receive()` call altogether, which may be considered as pausing for this inbound channel adapter until the `maxMessagesPerPoll` is changed to a non-zero value at a later time, e.g. via a Control Bus.

Also see [Global Default Poller](./endpoint.html#global-default-poller) for more information.| |---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### Configuring An Outbound Channel Adapter An `outbound-channel-adapter` element (a `@ServiceActivator` for Java configuration) can also connect a `MessageChannel` to any POJO consumer method that should be invoked with the payload of messages sent to that channel. The following example shows how to define an outbound channel adapter: Java DSL ``` @Bean public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) { return f -> f .handle(myPojo, "handle"); } ``` Java ``` public class MyPojo { @ServiceActivator(channel = "channel1") void handle(Object payload) { ... } } ``` Kotlin DSL ``` @Bean fun outboundChannelAdapterFlow(myPojo: MyPojo) = integrationFlow { handle(myPojo, "handle") } ``` XML ``` ``` If the channel being adapted is a `PollableChannel`, you must provide a poller sub-element (the `@Poller` sub-annotation on the `@ServiceActivator`), as the following example shows: Java ``` public class MyPojo { @ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000")) void handle(Object payload) { ... } } ``` XML ``` ``` You should use a `ref` attribute if the POJO consumer implementation can be reused in other `` definitions. However, if the consumer implementation is referenced by only a single definition of the ``, you can define it as an inner bean, as the following example shows: ``` ``` | |Using both the `ref` attribute and an inner handler definition in the same `` configuration is not allowed, as it creates an ambiguous condition.
Such a configuration results in an exception being thrown.| |---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| Any channel adapter can be created without a `channel` reference, in which case it implicitly creates an instance of `DirectChannel`. The created channel’s name matches the `id` attribute of the `` or `` element. Therefore, if `channel` is not provided, `id` is required. #### Channel Adapter Expressions and Scripts Like many other Spring Integration components, the `` and `` also provide support for SpEL expression evaluation. To use SpEL, provide the expression string in the 'expression' attribute instead of providing the 'ref' and 'method' attributes that are used for method-invocation on a bean. When an expression is evaluated, it follows the same contract as method-invocation where: the expression for an `` generates a message any time the evaluation result is a non-null value, while the expression for an `` must be the equivalent of a void-returning method invocation. Starting with Spring Integration 3.0, an `` can also be configured with a SpEL `` (or even with a `