22.4 KB
Newer Older
茶陵後's avatar
茶陵後 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
# Message 

### Message

The Spring Integration `Message` is a generic container for data.
Any object can be provided as the payload, and each `Message` instance includes headers containing user-extensible properties as key-value pairs.

#### The `Message` Interface

The following listing shows the definition of the `Message` interface:

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();


The `Message` interface is a core part of the API.
By encapsulating the data in a generic wrapper, the messaging system can pass it around without any knowledge of the data’s type.
As an application evolves to support new types or when the types themselves are modified or extended, the messaging system is not affected.
On the other hand, when some component in the messaging system does require access to information about the `Message`, such metadata can typically be stored to and retrieved from the metadata in the message headers.

#### Message Headers

Just as Spring Integration lets any `Object` be used as the payload of a `Message`, it also supports any `Object` types as header values.
In fact, the `MessageHeaders` class implements the `java.util.Map_ interface`, as the following class definition shows:

public final class MessageHeaders implements Map<String, Object>, Serializable {

|   |Even though the `MessageHeaders` class implements `Map`, it is effectively a read-only implementation.<br/>Any attempt to `put` a value in the Map results in an `UnsupportedOperationException`.<br/>The same applies for `remove` and `clear`.<br/>Since messages may be passed to multiple consumers, the structure of the `Map` cannot be modified.<br/>Likewise, the message’s payload `Object` can not be `set` after the initial creation.<br/>However, the mutability of the header values themselves (or the payload Object) is intentionally left as a decision for the framework user.|

As an implementation of `Map`, the headers can be retrieved by calling `get(..)` with the name of the header.
Alternatively, you can provide the expected `Class` as an additional parameter.
Even better, when retrieving one of the pre-defined values, convenient getters are available.
The following example shows each of these three options:

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

The following table describes the pre-defined message headers:

|                   Header Name                    |                            Header Type                             |                                                                                                                                  Usage                                                                                                                                  |
|        ```<br/> MessageHeaders.ID<br/>```        |                  ```<br/> java.util.UUID<br/>```                   |                                                                                          An identifier for this message instance.<br/>Changes each time a message is mutated.                                                                                           |
|  ```<br/> MessageHeaders.<br/>TIMESTAMP<br/>```  |                  ```<br/> java.lang.Long<br/>```                   |                                                                                              The time the message was created.<br/>Changes each time a message is mutated.                                                                                              |
|```<br/> MessageHeaders.<br/>REPLY_CHANNEL<br/>```|```<br/> java.lang.Object<br/>(String or<br/>MessageChannel)<br/>```|A channel to which a reply (if any) is sent when no explicit output channel is configured and there is no `ROUTING_SLIP` or the `ROUTING_SLIP` is exhausted.<br/>If the value is a `String`, it must represent a bean name or have been generated by a `ChannelRegistry.`|
|```<br/> MessageHeaders.<br/>ERROR_CHANNEL<br/>```|```<br/> java.lang.Object<br/>(String or<br/>MessageChannel)<br/>```|                                                            A channel to which errors are sent.<br/>If the value is a `String`, it must represent a bean name or have been generated by a `ChannelRegistry.`                                                             |

Many inbound and outbound adapter implementations also provide or expect certain headers, and you can configure additional user-defined headers.
Constants for these headers can be found in those modules where such headers exist — for example.`AmqpHeaders`, `JmsHeaders`, and so on.

##### `MessageHeaderAccessor` API

Starting with Spring Framework 4.0 and Spring Integration 4.0, the core messaging abstraction has been moved to the `spring-messaging` module, and the `MessageHeaderAccessor` API has been introduced to provide additional abstraction over messaging implementations.
All (core) Spring Integration-specific message headers constants are now declared in the `IntegrationMessageHeaderAccessor` class.
The following table describes the pre-defined message headers:

|                                 Header Name                                  |                          Header Type                          |                                                                                                                           Usage                                                                                                                           |
|    ```<br/> IntegrationMessageHeaderAccessor.<br/>CORRELATION_ID<br/>```     |               ```<br/> java.lang.Object<br/>```               |                                                                                                          Used to correlate two or more messages.                                                                                                          |
|    ```<br/> IntegrationMessageHeaderAccessor.<br/>SEQUENCE_NUMBER<br/>```    |              ```<br/> java.lang.Integer<br/>```               |                                            Usually a sequence number with a group of messages with a `SEQUENCE_SIZE` but can also be used in a `<resequencer/>` to resequence an unbounded group of messages.                                             |
|     ```<br/> IntegrationMessageHeaderAccessor.<br/>SEQUENCE_SIZE<br/>```     |              ```<br/> java.lang.Integer<br/>```               |                                                                                               The number of messages within a group of correlated messages.                                                                                               |
|    ```<br/> IntegrationMessageHeaderAccessor.<br/>EXPIRATION_DATE<br/>```    |                ```<br/> java.lang.Long<br/>```                |                            Indicates when a message is expired.<br/>Not used by the framework directly but can be set with a header enricher and used in a `<filter/>` that is configured with an `UnexpiredMessageSelector`.                             |
|       ```<br/> IntegrationMessageHeaderAccessor.<br/>PRIORITY<br/>```        |              ```<br/> java.lang.Integer<br/>```               |                                                                                                Message priority — for example, within a `PriorityChannel`.                                                                                                |
|   ```<br/> IntegrationMessageHeaderAccessor.<br/>DUPLICATE_MESSAGE<br/>```   |              ```<br/> java.lang.Boolean<br/>```               |                               True if a message was detected as a duplicate by an idempotent receiver interceptor.<br/>See [Idempotent Receiver Enterprise Integration Pattern](./handler-advice.html#idempotent-receiver).                               |
|  ```<br/> IntegrationMessageHeaderAccessor.<br/>CLOSEABLE_RESOURCE<br/>```   |              ```<br/><br/>```               |          This header is present if the message is associated with a `Closeable` that should be closed when message processing is complete.<br/>An example is the `Session` associated with a streamed file transfer using FTP, SFTP, and so on.           |
|   ```<br/> IntegrationMessageHeaderAccessor.<br/>DELIVERY_ATTEMPT<br/>```    |         ```<br/> java.lang.<br/>AtomicInteger<br/>```         |                                                          If a message-driven channel adapter supports the configuration of a `RetryTemplate`, this header contains the current delivery attempt.                                                          |
|```<br/> IntegrationMessageHeaderAccessor.<br/>ACKNOWLEDGMENT_CALLBACK<br/>```|```<br/><br/>Acknowledgment<br/>Callback<br/>```|If an inbound endpoint supports it, a call back to accept, reject, or requeue a message.<br/>See [Deferred Acknowledgment Pollable Message Source](./polling-consumer.html#deferred-acks-message-source) and [MQTT Manual Acks](./mqtt.html#mqtt-ack-mode).|

Convenient typed getters for some of these headers are provided on the `IntegrationMessageHeaderAccessor` class, as the following example shows:

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();

The following table describes headers that also appear in the `IntegrationMessageHeaderAccessor` but are generally not used by user code (that is, they are generally used by internal parts of Spring Integration — their inclusion here is for completeness):

|                              Header Name                              |                       Header Type                        |                                                               Usage                                                                |
|```<br/> IntegrationMessageHeaderAccessor.<br/>SEQUENCE_DETAILS<br/>```|    ```<br/> java.util.<br/>List<List<Object>><br/>```    |A stack of correlation data used when nested correlation is needed (for example,`splitter→…​→splitter→…​→aggregator→…​→aggregator`).|
|  ```<br/> IntegrationMessageHeaderAccessor.<br/>ROUTING_SLIP<br/>```  |```<br/> java.util.<br/>Map<List<Object>, Integer><br/>```|                                          See [Routing Slip](./router.html#routing-slip).                                           |

##### Message ID Generation

When a message transitions through an application, each time it is mutated (for example,
by a transformer) a new message ID is assigned.
The message ID is a `UUID`.
Beginning with Spring Integration 3.0, the default strategy used for IS generation is more efficient than the previous `java.util.UUID.randomUUID()` implementation.
It uses simple random numbers based on a secure random seed instead of creating a secure random number each time.

A different UUID generation strategy can be selected by declaring a bean that implements `org.springframework.util.IdGenerator` in the application context.

|   |Only one UUID generation strategy can be used in a classloader.<br/>This means that, if two or more application contexts run in the same classloader, they share the same strategy.<br/>If one of the contexts changes the strategy, it is used by all contexts.<br/>If two or more contexts in the same classloader declare a bean of type `org.springframework.util.IdGenerator`, they must all be an instance of the same class.<br/>Otherwise, the context attempting to replace a custom strategy fails to initialize.<br/>If the strategy is the same, but parameterized, the strategy in the first context to be initialized is used.|

In addition to the default strategy, two additional `IdGenerators` are provided.`org.springframework.util.JdkIdGenerator` uses the previous `UUID.randomUUID()` mechanism.
You can use `` when a UUID is not really needed and a simple incrementing value is sufficient.

##### Read-only Headers

The `MessageHeaders.ID` and `MessageHeaders.TIMESTAMP` are read-only headers and cannot be overridden.

Since version 4.3.2, the `MessageBuilder` provides the `readOnlyHeaders(String…​ readOnlyHeaders)` API to customize a list of headers that should not be copied from an upstream `Message`.
Only the `MessageHeaders.ID` and `MessageHeaders.TIMESTAMP` are read only by default.
The global `spring.integration.readOnly.headers` property (see [Global Properties](./configuration.html#global-properties)) is provided to customize `DefaultMessageBuilderFactory` for framework components.
This can be useful when you would like do not populate some out-of-the-box headers, such as `contentType` by the `ObjectToJsonTransformer` (see [JSON Transformers](./transformer.html#json-transformers)).

When you try to build a new message using `MessageBuilder`, this kind of header is ignored and a particular `INFO` message is emitted to logs.

Starting with version 5.0, [Messaging Gateway](./gateway.html#gateway), [Header Enricher](./content-enrichment.html#header-enricher), [Content Enricher](./content-enrichment.html#payload-enricher) and [Header Filter](./transformer.html#header-filter) do not let you configure the `MessageHeaders.ID` and `MessageHeaders.TIMESTAMP` header names when `DefaultMessageBuilderFactory` is used, and they throw `BeanInitializationException`.

##### Header Propagation

When messages are processed (and modified) by message-producing endpoints (such as a [service activator](./service-activator.html#service-activator)), in general, inbound headers are propagated to the outbound message.
One exception to this is a [transformer](./transformer.html#transformer), when a complete message is returned to the framework.
In that case, the user code is responsible for the entire outbound message.
When a transformer just returns the payload, the inbound headers are propagated.
Also, a header is only propagated if it does not already exist in the outbound message, letting you change header values as needed.

Starting with version 4.3.10, you can configure message handlers (that modify messages and produce output) to suppress the propagation of specific headers.
To configure the header(s) you do not want to be copied, call the `setNotPropagatedHeaders()` or `addNotPropagatedHeaders()` methods on the `MessageProducingMessageHandler` abstract class.

You can also globally suppress propagation of specific message headers by setting the `readOnlyHeaders` property in `META-INF/` to a comma-delimited list of headers.

Starting with version 5.0, the `setNotPropagatedHeaders()` implementation on the `AbstractMessageProducingHandler` applies simple patterns (`xxx*`, `**xxx**`**, `*xxx`**, or `xxx*yyy`) to allow filtering headers with a common suffix or prefix.
See [`PatternMatchUtils` Javadoc]( for more information.
When one of the patterns is `*` (asterisk), no headers are propagated.
All other patterns are ignored.
In that case, the service activator behaves the same way as a transformer and any required headers must be supplied in the `Message` returned from the service method.
The `notPropagatedHeaders()` option is available in the `ConsumerEndpointSpec` for the Java DSL
It is also available for XML configuration of the `<service-activator>` component as a `not-propagated-headers` attribute.

|   |Header propagation suppression does not apply to those endpoints that do not modify the message, such as [bridges](./bridge.html#bridge) and [routers](./router.html#router).|

#### Message Implementations

The base implementation of the `Message` interface is `GenericMessage<T>`, and it provides two constructors, shown in the following listing:

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

When a `Message` is created, a random unique ID is generated.
The constructor that accepts a `Map` of headers copies the provided headers to the newly created `Message`.

There is also a convenient implementation of `Message` designed to communicate error conditions.
This implementation takes a `Throwable` object as its payload, as the following example shows:

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

Note that this implementation takes advantage of the fact that the `GenericMessage` base class is parameterized.
Therefore, as shown in both examples, no casting is necessary when retrieving the `Message` payload `Object`.

#### The `MessageBuilder` Helper Class

You may notice that the `Message` interface defines retrieval methods for its payload and headers but provides no setters.
The reason for this is that a `Message` cannot be modified after its initial creation.
Therefore, when a `Message` instance is sent to multiple consumers (for example,
through a publish-subscribe Channel), if one of those consumers needs to send a reply with a different payload type, it must create a new `Message`.
As a result, the other consumers are not affected by those changes.
Keep in mind that multiple consumers may access the same payload instance or header value, and whether such an instance is itself immutable is a decision left to you.
In other words, the contract for `Message` instances is similar to that of an unmodifiable `Collection`, and the `MessageHeaders` map further exemplifies that.
Even though the `MessageHeaders` class implements `java.util.Map`, any attempt to invoke a `put` operation (or 'remove' or 'clear') on a `MessageHeaders` instance results in an `UnsupportedOperationException`.

Rather than requiring the creation and population of a Map to pass into the GenericMessage constructor, Spring Integration does provide a far more convenient way to construct Messages: `MessageBuilder`.
The `MessageBuilder` provides two factory methods for creating `Message` instances from either an existing `Message` or with a payload `Object`.
When building from an existing `Message`, the headers and payload of that `Message` are copied to the new `Message`, as the following example shows:

Message<String> message1 = MessageBuilder.withPayload("test")
        .setHeader("foo", "bar")

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

If you need to create a `Message` with a new payload but still want to copy the headers from an existing `Message`, you can use one of the 'copy' methods, as the following example shows:

Message<String> message3 = MessageBuilder.withPayload("test3")

Message<String> message4 = MessageBuilder.withPayload("test4")
        .setHeader("foo", 123)

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

Note that the `copyHeadersIfAbsent` method does not overwrite existing values.
Also, in the preceding example, you can see how to set any user-defined header with `setHeader`.
Finally, there are `set` methods available for the predefined headers as well as a non-destructive method for setting any header (`MessageHeaders` also defines constants for the pre-defined header names).

You can also use `MessageBuilder` to set the priority of messages, as the following example shows:

Message<Integer> importantMessage = MessageBuilder.withPayload(99)

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
        .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

The `priority` header is considered only when using a `PriorityChannel` (as described in the next chapter).
It is defined as a `java.lang.Integer`.