message-publishing.md 15.8 KB
Newer Older
茶陵後's avatar
茶陵後 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
# Message Publishing

## Message Publishing

The (Aspect-oriented Programming) AOP message publishing feature lets you construct and send a message as a by-product of a method invocation.
For example, imagine you have a component and, every time the state of this component changes, you want to be notified by a message.
The easiest way to send such notifications is to send a message to a dedicated channel, but how would you connect the method invocation that changes the state of the object to a message sending process, and how should the notification message be structured?
The AOP message publishing feature handles these responsibilities with a configuration-driven approach.

### Message Publishing Configuration

Spring Integration provides two approaches: XML configuration and annotation-driven (Java) configuration.

#### Annotation-driven Configuration with the `@Publisher` Annotation

The annotation-driven approach lets you annotate any method with the `@Publisher` annotation to specify a 'channel' attribute.
Starting with version 5.1, to switch this functionality on, you must use the `@EnablePublisher` annotation on some `@Configuration` class.
See [Configuration and `@EnableIntegration`](./overview.html#configuration-enable-integration) for more information.
The message is constructed from the return value of the method invocation and sent to the channel specified by the 'channel' attribute.
To further manage message structure, you can also use a combination of both `@Payload` and `@Header` annotations.

Internally, this message publishing feature of Spring Integration uses both Spring AOP by defining `PublisherAnnotationAdvisor` and the Spring Expression Language (SpEL), giving you considerable flexibility and control over the structure of the `Message` it publishes.

The `PublisherAnnotationAdvisor` defines and binds the following variables:

* `#return`: Binds to a return value, letting you reference it or its attributes (for example, `#return.something`, where 'something' is an attribute of the object bound to `#return`)

* `#exception`: Binds to an exception if one is thrown by the method invocation

* `#args`: Binds to method arguments so that you can extract individual arguments by name (for example, `#args.fname`)

Consider the following example:

```
@Publisher
public String defaultPayload(String fname, String lname) {
  return fname + " " + lname;
}
```

In the preceding example, the message is constructed with the following structure:

* The message payload is the return type and value of the method.
  This is the default.

* A newly constructed message is sent to a default publisher channel that is configured with an annotation post processor (covered later in this section).

The following example is the same as the preceding example, except that it does not use a default publishing channel:

```
@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
  return fname + " " + lname;
}
```

Instead of using a default publishing channel, we specify the publishing channel by setting the 'channel' attribute of the `@Publisher` annotation.
We also add a `@Header` annotation, which results in the message header named 'last' having the same value as the 'lname' method parameter.
That header is added to the newly constructed message.

The following example is almost identical to the preceding example:

```
@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
  return fname + " " + lname;
}
```

The only difference is that we use a `@Payload` annotation on the method to explicitly specify that the return value of the method should be used as the payload of the message.

The following example expands on the previous configuration by using the Spring Expression Language in the `@Payload` annotation to further instruct the framework about how the message should be constructed:

```
@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
  return fname + " " + lname;
}
```

In the preceding example, the message is a concatenation of the return value of the method invocation and the 'lname' input argument.
The Message header named 'x' has its value determined by the 'num' input argument.
That header is added to the newly constructed message.

```
@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
  return fname + " " + lname;
}
```

In the preceding example, you see another usage of the `@Payload` annotation.
Here, we annotate a method argument that becomes the payload of the newly constructed message.

As with most other annotation-driven features in Spring, you need to register a post-processor (`PublisherAnnotationBeanPostProcessor`).
The following example shows how to do so:

```
<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>
```

For a more concise configuration, you can instead use namespace support, as the following example shows:

```
<int:annotation-config>
    <int:enable-publisher default-publisher-channel="defaultChannel"/>
</int:annotation-config>
```

For Java configuration, you must use the `@EnablePublisher` annotation, as the following example shows:

```
@Configuration
@EnableIntegration
@EnablePublisher("defaultChannel")
public class IntegrationConfiguration {
    ...
}
```

Starting with version 5.1.3, the `<int:enable-publisher>` component, as well as the `@EnablePublisher` annotation have the `proxy-target-class` and `order` attributes for tuning the `ProxyFactory` configuration.

Similar to other Spring annotations (`@Component`, `@Scheduled`, and so on), you can also use `@Publisher` as a meta-annotation.
This means that you can define your own annotations that are treated in the same way as the `@Publisher` itself.
The following example shows how to do so:

```
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
...
}
```

In the preceding example, we define the `@Audit` annotation, which is itself annotated with `@Publisher`.
Also note that you can define a `channel` attribute on the meta-annotation to encapsulate where messages are sent inside of this annotation.
Now you can annotate any method with the `@Audit` annotation, as the following example shows:

```
@Audit
public String test() {
    return "Hello";
}
```

In the preceding example, every invocation of the `test()` method results in a message with a payload created from its return value.
Each message is sent to the channel named `auditChannel`.
One of the benefits of this technique is that you can avoid the duplication of the same channel name across multiple annotations.
You also can provide a level of indirection between your own, potentially domain-specific, annotations and those provided by the framework.

You can also annotate the class, which lets you apply the properties of this annotation on every public method of that class, as the following example shows:

```
@Audit
static class BankingOperationsImpl implements BankingOperations {

  public String debit(String amount) {
     . . .

  }

  public String credit(String amount) {
     . . .
  }

}
```

#### XML-based Approach with the `<publishing-interceptor>` element

The XML-based approach lets you configure the same AOP-based message publishing functionality as a namespace-based configuration of a `MessagePublishingInterceptor`.
It certainly has some benefits over the annotation-driven approach, since it lets you use AOP pointcut expressions, thus possibly intercepting multiple methods at once or intercepting and publishing methods to which you do not have the source code.

To configure message publishing with XML, you need only do the following two things:

* Provide configuration for `MessagePublishingInterceptor` by using the `<publishing-interceptor>` XML element.

* Provide AOP configuration to apply the `MessagePublishingInterceptor` to managed objects.

The following example shows how to configure a `publishing-interceptor` element:

```
<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
  <method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="things" value="something"/>
  </method>
  <method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="things" expression="'something'.toUpperCase()"/>
  </method>
  <method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>
```

The `<publishing-interceptor>` configuration looks rather similar to the annotation-based approach, and it also uses the power of the Spring Expression Language.

In the preceding example, the execution of the `echo` method of a `testBean` renders a `Message` with the following structure:

* The `Message` payload is of type `String` with the following content: `Echoing: [value]`, where `value` is the value returned by an executed method.

* The `Message` has a header with a name of `things` and a value of `something`.

* The `Message` is sent to `echoChannel`.

The second method is very similar to the first.
Here, every method that begins with 'repl' renders a `Message` with the following structure:

* The `Message` payload is the same as in the preceding sample.

* The `Message` has a header named `things` whose value is the result of the SpEL expression `'something'.toUpperCase()`.

* The `Message` is sent to `echoChannel`.

The second method, mapping the execution of any method that begins with `echoDef`, produces a `Message` with the following structure:

* The `Message` payload is the value returned by an executed method.

* Since the `channel` attribute is not provided, the `Message` is sent to the `defaultChannel` defined by the `publisher`.

For simple mapping rules you can rely on the `publisher` defaults, as the following example shows:

```
<publishing-interceptor id="anotherInterceptor"/>
```

The preceding example maps the return value of every method that matches the pointcut expression to a payload and is sent to a `default-channel`.
If you do not specify the `defaultChannel` (as the preceding example does not do), the messages are sent to the global `nullChannel` (the equivalent of `/dev/null`).

##### Asynchronous Publishing

Publishing occurs in the same thread as your component’s execution.
So, by default, it is synchronous.
This means that the entire message flow has to wait until the publisher’s flow completes.
However, developers often want the complete opposite: to use this message-publishing feature to initiate asynchronous flows.
For example, you might host a service (HTTP, WS, and so on) which receives a remote request.
You may want to send this request internally into a process that might take a while.
However you may also want to reply to the user right away.
So, instead of sending inbound requests for processing to the output channel (the conventional way), you can use 'output-channel' or a 'replyChannel' header to send a simple acknowledgment-like reply back to the caller while using the message-publisher feature to initiate a complex flow.

The service in the following example receives a complex payload (which needs to be sent further for processing), but it also needs to reply to the caller with a simple acknowledgment:

```
public String echo(Object complexPayload) {
     return "ACK";
}
```

So, instead of hooking up the complex flow to the output channel, we use the message-publishing feature instead.
We configure it to create a new message, by using the input argument of the service method (shown in the preceding example), and send that to the 'localProcessChannel'.
To make sure this flow is asynchronous, all we need to do is send it to any type of asynchronous channel (`ExecutorChannel` in the next example).
The following example shows how to an asynchronous `publishing-interceptor`:

```
<int:service-activator  input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>

<bean id="sampleservice" class="test.SampleService"/>

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(sampleservice)" />
</aop:config>

<int:publishing-interceptor id="interceptor" >
  <int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
    <int:header name="sample_header" expression="'some sample value'"/>
  </int:method>
</int:publishing-interceptor>

<int:channel id="localProcessChannel">
  <int:dispatcher task-executor="executor"/>
</int:channel>

<task:executor id="executor" pool-size="5"/>
```

Another way of handling this type of scenario is with a wire-tap.
See [Wire Tap](./channel.html#channel-wiretap).

#### Producing and Publishing Messages Based on a Scheduled Trigger

In the preceding sections, we looked at the message-publishing feature, which constructs and publishes messages as by-products of method invocations.
However, in those cases, you are still responsible for invoking the method.
Spring Integration 2.0 added support for scheduled message producers and publishers with the new `expression` attribute on the 'inbound-channel-adapter' element.
You can schedule based on several triggers, any one of which can be configured on the 'poller' element.
Currently, we support `cron`, `fixed-rate`, `fixed-delay` and any custom trigger implemented by you and referenced by the 'trigger' attribute value.

As mentioned earlier, support for scheduled producers and publishers is provided via the `<inbound-channel-adapter>` XML element.
Consider the following example:

```
<int:inbound-channel-adapter id="fixedDelayProducer"
       expression="'fixedDelayTest'"
       channel="fixedDelayChannel">
    <int:poller fixed-delay="1000"/>
</int:inbound-channel-adapter>
```

The preceding example creates an inbound channel adapter that constructs a `Message`, with its payload being the result of the expression defined in the `expression` attribute.
Such messages are created and sent every time the delay specified by the `fixed-delay` attribute occurs.

The following example is similar to the preceding example, except that it uses the `fixed-rate` attribute:

```
<int:inbound-channel-adapter id="fixedRateProducer"
       expression="'fixedRateTest'"
       channel="fixedRateChannel">
    <int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>
```

The `fixed-rate` attribute lets you send messages at a fixed rate (measuring from the start time of each task).

The following example shows how you can apply a Cron trigger with a value specified in the `cron` attribute:

```
<int:inbound-channel-adapter id="cronProducer"
       expression="'cronTest'"
       channel="cronChannel">
    <int:poller cron="7 6 5 4 3 ?"/>
</int:inbound-channel-adapter>
```

The following example shows how to insert additional headers into the message:

```
<int:inbound-channel-adapter id="headerExpressionsProducer"
       expression="'headerExpressionsTest'"
       channel="headerExpressionsChannel"
       auto-startup="false">
    <int:poller fixed-delay="5000"/>
    <int:header name="foo" expression="6 * 7"/>
    <int:header name="bar" value="x"/>
</int:inbound-channel-adapter>
```

The additional message headers can take scalar values or the results of evaluating Spring expressions.

If you need to implement your own custom trigger, you can use the `trigger` attribute to provide a reference to any spring configured bean that implements the `org.springframework.scheduling.Trigger` interface.
The following example shows how to do so:

```
<int:inbound-channel-adapter id="triggerRefProducer"
       expression="'triggerRefTest'" channel="triggerRefChannel">
    <int:poller trigger="customTrigger"/>
</int:inbound-channel-adapter>

<beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger">
    <beans:constructor-arg value="9999"/>
</beans:bean>
```