ip.md 123.5 KB
Newer Older
茶陵後's avatar
茶陵後 已提交
1 2
# TCP and UDP Support 

3
## TCP and UDP Support
茶陵後's avatar
茶陵後 已提交
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28

Spring Integration provides channel adapters for receiving and sending messages over internet protocols.
Both UDP (User Datagram Protocol) and TCP (Transmission Control Protocol) adapters are provided.
Each adapter provides for one-way communication over the underlying protocol.
In addition, Spring Integration provides simple inbound and outbound TCP gateways.
These are used when two-way communication is needed.

You need to include this dependency into your project:

Maven

```
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-ip</artifactId>
    <version>5.5.9</version>
</dependency>
```

Gradle

```
compile "org.springframework.integration:spring-integration-ip:5.5.9"
```

29
### Introduction
茶陵後's avatar
茶陵後 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60

Two flavors each of UDP inbound and outbound channel adapters are provided:

* `UnicastSendingMessageHandler` sends a datagram packet to a single destination.

* `UnicastReceivingChannelAdapter` receives incoming datagram packets.

* `MulticastSendingMessageHandler` sends (broadcasts) datagram packets to a multicast address.

* `MulticastReceivingChannelAdapter` receives incoming datagram packets by joining to a multicast address.

TCP inbound and outbound channel adapters are provided:

* `TcpSendingMessageHandler` sends messages over TCP.

* `TcpReceivingChannelAdapter` receives messages over TCP.

An inbound TCP gateway is provided.
It allows for simple request-response processing.
While the gateway can support any number of connections, each connection can only be processed serially.
The thread that reads from the socket waits for, and sends, the response before reading again.
If the connection factory is configured for single use connections, the connection is closed after the socket times out.

An outbound TCP gateway is provided.
It allows for simple request-response processing.
If the associated connection factory is configured for single-use connections, a new connection is immediately created for each new request.
Otherwise, if the connection is in use, the calling thread blocks on the connection until either a response is received or a timeout or I/O error occurs.

The TCP and UDP inbound channel adapters and the TCP inbound gateway support the `error-channel` attribute.
This provides the same basic functionality as described in [Enter the `GatewayProxyFactoryBean`](./gateway.html#gateway-proxy).

61
### UDP Adapters
茶陵後's avatar
茶陵後 已提交
62 63 64

This section describes how to configure and use the UDP adapters.

65
#### 
茶陵後's avatar
茶陵後 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132

The following example configures a UDP outbound channel adapter:

```
<int-ip:udp-outbound-channel-adapter id="udpOut"
    host="somehost"
    port="11111"
    multicast="false"
    socket-customizer="udpCustomizer"
    channel="exampleChannel"/>
```

|   |When setting `multicast` to `true`, you should also provide the multicast address in the host attribute.|
|---|--------------------------------------------------------------------------------------------------------|

UDP is an efficient but unreliable protocol.
Spring Integration adds two attributes to improve reliability: `check-length` and `acknowledge`.
When `check-length` is set to `true`, the adapter precedes the message data with a length field (four bytes in network byte order).
This enables the receiving side to verify the length of the packet received.
If a receiving system uses a buffer that is too short to contain the packet, the packet can be truncated.
The `length` header provides a mechanism to detect this.

Starting with version 4.3, you can set the `port` to `0`, in which case the operating system chooses the port.
The chosen port can be discovered by invoking `getPort()` after the adapter is started and `isListening()` returns `true`.

Starting with version 5.3.3, you can add a `SocketCustomizer` bean to modify the `DatagramSocket` after it is created (for example, call `setTrafficClass(0x10)`).

The following example shows an outbound channel adapter that adds length checking to the datagram packets:

```
<int-ip:udp-outbound-channel-adapter id="udpOut"
    host="somehost"
    port="11111"
    multicast="false"
    check-length="true"
    channel="exampleChannel"/>
```

|   |The recipient of the packet must also be configured to expect a length to precede the actual data.<br/>For a Spring Integration UDP inbound channel adapter, set its `check-length` attribute.|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

The second reliability improvement allows an application-level acknowledgment protocol to be used.
The receiver must send an acknowledgment to the sender within a specified time.

The following example shows an outbound channel adapter that adds length checking to the datagram packets and waits for an acknowledgment:

```
<int-ip:udp-outbound-channel-adapter id="udpOut"
    host="somehost"
    port="11111"
    multicast="false"
    check-length="true"
    acknowledge="true"
    ack-host="thishost"
    ack-port="22222"
    ack-timeout="10000"
    channel="exampleChannel"/>
```

|   |Setting `acknowledge` to `true` implies that the recipient of the packet can interpret the header added to the packet containing acknowledgment data (host and port).<br/>Most likely, the recipient is a Spring Integration inbound channel adapter.|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

|   |When multicast is true, an additional attribute (`min-acks-for-success`) specifies how many acknowledgments must be received within the `ack-timeout`.|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------|

Starting with version 4.3, you can set the `ackPort` to `0`, in which case the operating system chooses the port.

133
#### 
茶陵後's avatar
茶陵後 已提交
134 135 136 137 138 139 140 141 142 143 144 145 146

The following example shows how to configure an outbound UDP adapter with Java:

```
@Bean
@ServiceActivator(inputChannel = "udpOut")
public UnicastSendingMessageHandler handler() {
    return new UnicastSendingMessageHandler("localhost", 11111);
}
```

(or `MulticastSendingChannelAdapter` for multicast).

147
#### 
茶陵後's avatar
茶陵後 已提交
148 149 150 151 152 153 154 155 156 157 158 159

The following example shows how to configure an outbound UDP adapter with the Java DSL:

```
@Bean
public IntegrationFlow udpOutFlow() {
    return f -> f.handle(Udp.outboundAdapter("localhost", 1234)
                    .configureSocket(socket -> socket.setTrafficClass(0x10)))
                .get();
}
```

160
#### 
茶陵後's avatar
茶陵後 已提交
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192

The following example shows how to configure a basic unicast inbound udp channel adapter.

```
<int-ip:udp-inbound-channel-adapter id="udpReceiver"
    channel="udpOutChannel"
    port="11111"
    receive-buffer-size="500"
    multicast="false"
    socket-customizer="udpCustomizer"
    check-length="true"/>
```

The following example shows how to configure a basic multicast inbound udp channel adapter:

```
<int-ip:udp-inbound-channel-adapter id="udpReceiver"
    channel="udpOutChannel"
    port="11111"
    receive-buffer-size="500"
    multicast="true"
    multicast-address="225.6.7.8"
    check-length="true"/>
```

By default, reverse DNS lookups are done on inbound packets to convert IP addresses to host names for use in message headers.
In environments where DNS is not configured, this can cause delays.
You can override this default behavior by setting the `lookup-host` attribute to `false`.

Starting with version 5.3.3, you can add a `SocketCustomizer` bean to modify the `DatagramSocket` after it is created.
It is called for the receiving socket and any sockets created for sending acks.

193
#### 
茶陵後's avatar
茶陵後 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206 207

The following example shows how to configure an inbound UDP adapter with Java:

```
@Bean
public UnicastReceivingChannelAdapter udpIn() {
    UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(11111);
    adapter.setOutputChannelName("udpChannel");
    return adapter;
}
```

The following example shows how to configure an inbound UDP adapter with the Java DSL:

208
#### 
茶陵後's avatar
茶陵後 已提交
209 210 211 212 213 214 215 216 217 218

```
@Bean
public IntegrationFlow udpIn() {
    return IntegrationFlows.from(Udp.inboundAdapter(11111))
            .channel("udpChannel")
            .get();
}
```

219
#### Server Listening Events
茶陵後's avatar
茶陵後 已提交
220 221 222 223 224

Starting with version 5.0.2, a `UdpServerListeningEvent` is emitted when an inbound adapter is started and has begun listening.
This is useful when the adapter is configured to listen on port 0, meaning that the operating system chooses the port.
It can also be used instead of polling `isListening()`, if you need to wait before starting some other process that will connect to the socket.

225
#### Advanced Outbound Configuration
茶陵後's avatar
茶陵後 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270

The `<int-ip:udp-outbound-channel-adapter>` (`UnicastSendingMessageHandler`) has `destination-expression` and `socket-expression` options.

You can use the `destination-expression` as a runtime alternative to the hardcoded `host`-`port` pair to determine the destination address for the outgoing datagram packet against a `requestMessage` (with the root object for the evaluation context).
The expression must evaluate to an `URI`, a `String` in the URI style (see [RFC-2396](https://www.ietf.org/rfc/rfc2396.txt)), or a `SocketAddress`.
You can also use the inbound `IpHeaders.PACKET_ADDRESS` header for this expression.
In the framework, the `DatagramPacketMessageMapper` populates this header when we receive datagrams in the `UnicastReceivingChannelAdapter` and convert them to messages.
The header value is exactly the result of `DatagramPacket.getSocketAddress()` of the incoming datagram.

With the `socket-expression`, the outbound channel adapter can use (for example) an inbound channel adapter socket to send datagrams through the same port which they were received.
It is useful in a scenario where our application works as a UDP server and clients operate behind network address translation (NAT).
This expression must evaluate to a `DatagramSocket`.
The `requestMessage` is used as the root object for the evaluation context.
You cannot use the `socket-expression` parameter with the `multicast` and `acknowledge` parameters.
The following example shows how to configure a UDP inbound channel adapter with a transformer that converts to upper case and uses a socket:

```
<int-ip:udp-inbound-channel-adapter id="inbound" port="0" channel="in" />

<int:channel id="in" />

<int:transformer expression="new String(payload).toUpperCase()"
                       input-channel="in" output-channel="out"/>

<int:channel id="out" />

<int-ip:udp-outbound-channel-adapter id="outbound"
                        socket-expression="@inbound.socket"
                        destination-expression="headers['ip_packetAddress']"
                        channel="out" />
```

The following example shows the equivalent configuration with the Java DSL:

```
@Bean
public IntegrationFlow udpEchoUpcaseServer() {
    return IntegrationFlows.from(Udp.inboundAdapter(11111).id("udpIn"))
            .<byte[], String>transform(p -> new String(p).toUpperCase())
            .handle(Udp.outboundAdapter("headers['ip_packetAddress']")
                    .socketExpression("@udpIn.socket"))
            .get();
}
```

271
### TCP Connection Factories
茶陵後's avatar
茶陵後 已提交
272

273
#### Overview
茶陵後's avatar
茶陵後 已提交
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342

For TCP, the configuration of the underlying connection is provided by using a connection factory.
Two types of connection factory are provided: a client connection factory and a server connection factory.
Client connection factories establish outgoing connections.
Server connection factories listen for incoming connections.

An outbound channel adapter uses a client connection factory, but you can also provide a reference to a client connection factory to an inbound channel adapter.
That adapter receives any incoming messages that are received on connections created by the outbound adapter.

An inbound channel adapter or gateway uses a server connection factory.
(In fact, the connection factory cannot function without one).
You can also provide a reference to a server connection factory to an outbound adapter.
You can then use that adapter to send replies to incoming messages on the same connection.

|   |Reply messages are routed to the connection only if the reply contains the `ip_connectionId` header that was inserted into the original message by the connection factory.|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

|   |This is the extent of message correlation performed when sharing connection factories between inbound and outbound adapters.<br/>Such sharing allows for asynchronous two-way communication over TCP.<br/>By default, only payload information is transferred using TCP.<br/>Therefore, any message correlation must be performed by downstream components such as aggregators or other endpoints.<br/>Support for transferring selected headers was introduced in version 3.0.<br/>For more information, see [TCP Message Correlation](#ip-correlation).|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

You may give a reference to a connection factory to a maximum of one adapter of each type.

Spring Integration provides connection factories that use `java.net.Socket` and `java.nio.channel.SocketChannel`.

The following example shows a simple server connection factory that uses `java.net.Socket` connections:

```
<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"/>
```

The following example shows a simple server connection factory that uses `java.nio.channel.SocketChannel` connections:

```
<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    using-nio="true"/>
```

|   |Starting with Spring Integration version 4.2, if the server is configured to listen on a random port (by setting the port to `0`), you can get the actual port chosen by the OS by using `getPort()`.<br/>Also, `getServerSocketAddress()` lets you get the complete `SocketAddress`.<br/>See the [Javadoc for the `TcpServerConnectionFactory` interface](https://docs.spring.io/spring-integration/api/org/springframework/integration/ip/tcp/connection/TcpServerConnectionFactory.html) for more information.|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

```
<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"/>
```

The following example shows a client connection factory that uses `java.net.Socket` connections and creates a new connection for each message:

```
<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"
    using-nio=true/>
```

Starting with version 5.2, the client connection factories support the property `connectTimeout`, specified in seconds, which defaults to 60.

Also see [Annotation-Based Configuration](#ip-annotation) and [Using the Java DSL for TCP Components](#ip-dsl).

343
#### 
茶陵後's avatar
茶陵後 已提交
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449

TCP is a streaming protocol.
This means that some structure has to be provided to data transported over TCP so that the receiver can demarcate the data into discrete messages.
Connection factories are configured to use serializers and deserializers to convert between the message payload and the bits that are sent over TCP.
This is accomplished by providing a deserializer and a serializer for inbound and outbound messages, respectively.
Spring Integration provides a number of standard serializers and deserializers.

`ByteArrayCrlfSerializer`<sup>*</sup> converts a byte array to a stream of bytes followed by carriage return and linefeed characters (`\r\n`).
This is the default serializer (and deserializer) and can be used (for example) with telnet as a client.

The `ByteArraySingleTerminatorSerializer`<sup>*</sup> converts a byte array to a stream of bytes followed by a single termination character (the default is `0x00`).

The `ByteArrayLfSerializer`<sup>*</sup> converts a byte array to a stream of bytes followed by a single linefeed character (`0x0a`).

The `ByteArrayStxEtxSerializer`<sup>*</sup> converts a byte array to a stream of bytes preceded by an STX (`0x02`) and followed by an ETX (`0x03`).

The `ByteArrayLengthHeaderSerializer` converts a byte array to a stream of bytes preceded by a binary length in network byte order (big endian).
This an efficient deserializer because it does not have to parse every byte to look for a termination character sequence.
It can also be used for payloads that contain binary data.
The preceding serializers support only text in the payload.
The default size of the length header is four bytes (an Integer), allowing for messages up to (2^31 - 1) bytes.
However, the `length` header can be a single byte (unsigned) for messages up to 255 bytes, or an unsigned short (2 bytes) for messages up to (2^16 - 1) bytes.
If you need any other format for the header, you can subclass `ByteArrayLengthHeaderSerializer` and provide implementations for the `readHeader` and `writeHeader` methods.
The absolute maximum data size is (2^31 - 1) bytes.
Starting with version 5.2, the header value can include the length of the header in addition to the payload.
Set the `inclusive` property to enable that mechanism (it must be set to the same for producers and consumers).

The `ByteArrayRawSerializer`<sup>*</sup>, converts a byte array to a stream of bytes and adds no additional message demarcation data.
With this serializer (and deserializer), the end of a message is indicated by the client closing the socket in an orderly fashion.
When using this serializer, message reception hangs until the client closes the socket or a timeout occurs.
A timeout does not result in a message.
When this serializer is being used and the client is a Spring Integration application, the client must use a connection factory that is configured with `single-use="true"`.
Doing so causes the adapter to close the socket after sending the message.
The serializer does not, by itself, close the connection.
You should use this serializer only with the connection factories used by channel adapters (not gateways), and the connection factories should be used by either an inbound or outbound adapter but not both.
See also `ByteArrayElasticRawDeserializer`, later in this section.
However, since version 5.2, the outbound gateway has a new property `closeStreamAfterSend`; this allows the use of raw serializers/deserializers because the EOF is signaled to the server, while leaving the connection open to receive the reply.

|   |Before version 4.2.2, when using non-blocking I/O (NIO), this serializer treated a timeout (during read) as an end of file, and the data read so far was emitted as a message.<br/>This is unreliable and should not be used to delimit messages.<br/>It now treats such conditions as an exception.<br/>In the unlikely event that you use it this way, you can restore the previous behavior by setting the `treatTimeoutAsEndOfMessage` constructor argument to `true`.|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

Each of these is a subclass of `AbstractByteArraySerializer`, which implements both `org.springframework.core.serializer.Serializer` and `org.springframework.core.serializer.Deserializer`.
For backwards compatibility, connections that use any subclass of `AbstractByteArraySerializer` for serialization also accept a `String` that is first converted to a byte array.
Each of these serializers and deserializers converts an input stream that contains the corresponding format to a byte array payload.

To avoid memory exhaustion due to a badly behaved client (one that does not adhere to the protocol of the configured serializer), these serializers impose a maximum message size.
If an incoming message exceeds this size, an exception is thrown.
The default maximum message size is 2048 bytes.
You can increase it by setting the `maxMessageSize` property.
If you use the default serializer or deserializer and wish to increase the maximum message size, you must declare the maximum message size as an explicit bean with the `maxMessageSize` property set and configure the connection factory to use that bean.

The classes marked with <sup>*</sup> earlier in this section use an intermediate buffer and copy the decoded data to a final buffer of the correct size.
Starting with version 4.3, you can configure these buffers by setting a `poolSize` property to let these raw buffers be reused instead of being allocated and discarded for each message, which is the default behavior.
Setting the property to a negative value creates a pool that has no bounds.
If the pool is bounded, you can also set the `poolWaitTimeout` property (in milliseconds), after which an exception is thrown if no buffer becomes available.
It defaults to infinity.
Such an exception causes the socket to be closed.

If you wish to use the same mechanism in custom deserializers, you can extend `AbstractPooledBufferByteArraySerializer` (instead of its super class, `AbstractByteArraySerializer`) and implement `doDeserialize()` instead of `deserialize()`.
The buffer is automatically returned to the pool.`AbstractPooledBufferByteArraySerializer` also provides a convenient utility method: `copyToSizedArray()`.

Version 5.0 added the `ByteArrayElasticRawDeserializer`.
This is similar to the deserializer side of `ByteArrayRawSerializer` above, except that it is not necessary to set a `maxMessageSize`.
Internally, it uses a `ByteArrayOutputStream` that lets the buffer grow as needed.
The client must close the socket in an orderly manner to signal end of message.

|   |This deserializer should only be used when the peer is trusted; it is susceptible to a DoS attach due to out of memory conditions.|
|---|----------------------------------------------------------------------------------------------------------------------------------|

The `MapJsonSerializer` uses a Jackson `ObjectMapper` to convert between a `Map` and JSON.
You can use this serializer in conjunction with a `MessageConvertingTcpMessageMapper` and a `MapMessageConverter` to transfer selected headers and the payload in JSON.

|   |The Jackson `ObjectMapper` cannot demarcate messages in the stream.<br/>Therefore, the `MapJsonSerializer` needs to delegate to another serializer or deserializer to handle message demarcation.<br/>By default, a `ByteArrayLfSerializer` is used, resulting in messages with a format of `<json><LF>` on the wire, but you can configure it to use others instead.<br/>(The next example shows how to do so.)|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

The final standard serializer is `org.springframework.core.serializer.DefaultSerializer`, which you can use to convert serializable objects with Java serialization.`org.springframework.core.serializer.DefaultDeserializer` is provided for inbound deserialization of streams that contain serializable objects.

If you do not wish to use the default serializer and deserializer (`ByteArrayCrLfSerializer`), you must set the `serializer` and `deserializer` attributes on the connection factory.
The following example shows how to do so:

```
<bean id="javaSerializer"
      class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
      class="org.springframework.core.serializer.DefaultDeserializer" />

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    deserializer="javaDeserializer"
    serializer="javaSerializer"/>
```

A server connection factory that uses `java.net.Socket` connections and uses Java serialization on the wire.

For full details of the attributes available on connection factories, see [the reference](#ip-annotation) at the end of this section.

By default, reverse DNS lookups are done on inbound packets to convert IP addresses to host names for use in message headers.
In environments where DNS is not configured, this can cause connection delays.
You can override this default behavior by setting the `lookup-host` attribute to `false`.

|   |You can also modify the attributes of sockets and socket factories.<br/>See [SSL/TLS Support](#ssl-tls) for more information.<br/>As noted there, such modifications are possible whether or not SSL is being used.|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

Also see [Annotation-Based Configuration](#ip-annotation) and [Using the Java DSL for TCP Components](#ip-dsl).

450
#### Custom Serializers and Deserializers
茶陵後's avatar
茶陵後 已提交
451 452 453 454 455 456 457 458 459 460

If your data is not in a format supported by one of the standard deserializers, you can implement your own; you can also implement a custom serializer.

To implement a custom serializer and deserializer pair, implement the `org.springframework.core.serializer.Deserializer` and `org.springframework.core.serializer.Serializer` interfaces.

When the deserializer detects a closed input stream between messages, it must throw a `SoftEndOfStreamException`; this is a signal to the framework to indicate that the close was "normal".
If the stream is closed while decoding a message, some other exception should be thrown instead.

Starting with version 5.2, `SoftEndOfStreamException` is now a `RuntimeException` instead of extending `IOException`.

461
#### TCP Caching Client Connection Factory
茶陵後's avatar
茶陵後 已提交
462 463 464 465 466 467 468 469 470

As [noted earlier](#ip-intro), TCP sockets can be 'single-use' (one request or response) or shared.
Shared sockets do not perform well with outbound gateways in high-volume environments, because the socket can only process one request or response at a time.

To improve performance, you can use collaborating channel adapters instead of gateways, but that requires application-level message correlation.
See [TCP Message Correlation](#ip-correlation) for more information.

Spring Integration 2.2 introduced a caching client connection factory, which uses a pool of shared sockets, letting a gateway process multiple concurrent requests with a pool of shared connections.

471
#### TCP Failover Client Connection Factory
茶陵後's avatar
茶陵後 已提交
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515

You can configure a TCP connection factory that supports failover to one or more other servers.
When sending a message, the factory iterates over all its configured factories until either the message can be sent or no connection can be found.
Initially, the first factory in the configured list is used.
If a connection subsequently fails, the next factory becomes the current factory.
The following example shows how to configure a failover client connection factory:

```
<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
    <constructor-arg>
        <list>
            <ref bean="clientFactory1"/>
            <ref bean="clientFactory2"/>
        </list>
    </constructor-arg>
</bean>
```

|   |When using the failover connection factory, the `singleUse` property must be consistent between the factory itself and the list of factories it is configured to use.|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|

The connection factory has two properties related to failing back, when used with a shared connection (`singleUse=false`):

* `refreshSharedInterval`

* `closeOnRefresh`

Consider the following scenario based on the above configuration:
Let’s say `clientFactory1` cannot establish a connection but `clientFactory2` can.
When the `failCF` `getConnection()` method is called after the `refreshSharedInterval` has passed, we will again attempt to connect using `clientFactory1`; if successful, the connection to `clientFactory2` will be closed.
If `closeOnRefresh` is `false`, the "old" connection will remain open and may be reused in future if the first factory fails once more.

Set `refreshSharedInterval` to only attempt to reconnect with the first factory after that time has expired; setting it to `Long.MAX_VALUE` (default) if you only want to fail back to the first factory when the current connection fails.

Set `closeOnRefresh` to close the "old" connection after a refresh actually creates a new connection.

|   |These properties do not apply if any of the delegate factories is a `CachingClientConnectionFactory` because the connection caching is handled there; in that case the list of connection factories will always be consulted to get a connection.|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

Starting with version 5.3, these default to `Long.MAX_VALUE` and `true` so the factory only attempts to fail back when the current connection fails.
To revert to the default behavior of previous versions, set them to `0` and `false`.

Also see [Testing Connections](#testing-connections).

516
#### TCP Thread Affinity Connection Factory
茶陵後's avatar
茶陵後 已提交
517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548

Spring Integration version 5.0 introduced this connection factory.
It binds a connection to the calling thread, and the same connection is reused each time that thread sends a message.
This continues until the connection is closed (by the server or the network) or until the thread calls the `releaseConnection()` method.
The connections themselves are provided by another client factory implementation, which must be configured to provide non-shared (single-use) connections so that each thread gets a connection.

The following example shows how to configure a TCP thread affinity connection factory:

```
@Bean
public TcpNetClientConnectionFactory cf() {
    TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
            Integer.parseInt(System.getProperty(PORT)));
    cf.setSingleUse(true);
    return cf;
}

@Bean
public ThreadAffinityClientConnectionFactory tacf() {
    return new ThreadAffinityClientConnectionFactory(cf());
}

@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
    TcpOutboundGateway outGate = new TcpOutboundGateway();
    outGate.setConnectionFactory(tacf());
    outGate.setReplyChannelName("toString");
    return outGate;
}
```

549
### Testing Connections
茶陵後's avatar
茶陵後 已提交
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600

In some scenarios, it can be useful to send some kind of health-check request when a connection is first opened.
One such scenario might be when using a [TCP Failover Client Connection Factory](#failover-cf) so that we can fail over if the selected server allowed a connection to be opened but reports that it is not healthy.

In order to support this feature, add a `connectionTest` to the client connection factory.

```
/**
 * Set a {@link Predicate} that will be invoked to test a new connection; return true
 * to accept the connection, false the reject.
 * @param connectionTest the predicate.
 * @since 5.3
 */
public void setConnectionTest(@Nullable Predicate<TcpConnectionSupport> connectionTest) {
    this.connectionTest = connectionTest;
}
```

To test the connection, attach a temporary listener to the connection within the test.
If the test fails, the connection is closed and an exception thrown.
When used with the [TCP Failover Client Connection Factory](#failover-cf) this triggers trying the next server.

|   |Only the first reply from the server will go to the test listener.|
|---|------------------------------------------------------------------|

In the following example, the server is considered healthy if the server replies `PONG` when we send `PING`.

```
Message<String> ping = new GenericMessage<>("PING");
byte[] pong = "PONG".getBytes();
clientFactory.setConnectionTest(conn -> {
    CountDownLatch latch = new CountDownLatch(1);
    AtomicBoolean result = new AtomicBoolean();
    conn.registerTestListener(msg -> {
        if (Arrays.equals(pong, (byte[]) msg.getPayload())) {
            result.set(true);
        }
        latch.countDown();
        return false;
    });
    conn.send(ping);
    try {
        latch.await(10, TimeUnit.SECONDS);
    }
    catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    return result.get();
});
```

601
### TCP Connection Interceptors
茶陵後's avatar
茶陵後 已提交
602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654

You can configure connection factories with a reference to a `TcpConnectionInterceptorFactoryChain`.
You can use interceptors to add behavior to connections, such as negotiation, security, and other options.
No interceptors are currently provided by the framework, but see [`InterceptedSharedConnectionTests` in the source repository](https://github.com/spring-projects/spring-integration/blob/main/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/InterceptedSharedConnectionTests.java) for an example.

The `HelloWorldInterceptor` used in the test case works as follows:

The interceptor is first configured with a client connection factory.
When the first message is sent over an intercepted connection, the interceptor sends 'Hello' over the connection and expects to receive 'world!'.
When that occurs, the negotiation is complete and the original message is sent.
Further messages that use the same connection are sent without any additional negotiation.

When configured with a server connection factory, the interceptor requires the first message to be 'Hello' and, if it is, returns 'world!'.
Otherwise it throws an exception that causes the connection to be closed.

All `TcpConnection` methods are intercepted.
Interceptor instances are created for each connection by an interceptor factory.
If an interceptor is stateful, the factory should create a new instance for each connection.
If there is no state, the same interceptor can wrap each connection.
Interceptor factories are added to the configuration of an interceptor factory chain, which you can provide to a connection factory by setting the `interceptor-factory` attribute.
Interceptors must extend `TcpConnectionInterceptorSupport`.
Factories must implement the `TcpConnectionInterceptorFactory` interface.`TcpConnectionInterceptorSupport` has passthrough methods.
By extending this class, you only need to implement those methods you wish to intercept.

The following example shows how to configure a connection interceptor factory chain:

```
<bean id="helloWorldInterceptorFactory"
    class="o.s.i.ip.tcp.connection.TcpConnectionInterceptorFactoryChain">
    <property name="interceptors">
        <array>
            <bean class="o.s.i.ip.tcp.connection.HelloWorldInterceptorFactory"/>
        </array>
    </property>
</bean>

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="12345"
    using-nio="true"
    single-use="true"
    interceptor-factory-chain="helloWorldInterceptorFactory"/>

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="12345"
    single-use="true"
    so-timeout="100000"
    using-nio="true"
    interceptor-factory-chain="helloWorldInterceptorFactory"/>
```

655
### TCP Connection Events
茶陵後's avatar
茶陵後 已提交
656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690

Beginning with version 3.0, changes to `TcpConnection` instances are reported by `TcpConnectionEvent` instances.`TcpConnectionEvent` is a subclass of `ApplicationEvent` and can thus be received by any `ApplicationListener` defined in the `ApplicationContext` — for example [an event inbound channel adapter](./event.html#appevent-inbound).

`TcpConnectionEvents` have the following properties:

* `connectionId`: The connection identifier, which you can use in a message header to send data to the connection.

* `connectionFactoryName`: The bean name of the connection factory to which the connection belongs.

* `throwable`: The `Throwable` (for `TcpConnectionExceptionEvent` events only).

* `source`: The `TcpConnection`.
  You can use this, for example, to determine the remote IP Address with `getHostAddress()` (cast required).

In addition, since version 4.0, the standard deserializers discussed in [TCP Connection Factories](#tcp-connection-factories) now emit `TcpDeserializationExceptionEvent` instances when they encounter problems while decoding the data stream.
These events contain the exception, the buffer that was in the process of being built, and an offset into the buffer (if available) at the point where the exception occurred.
Applications can use a normal `ApplicationListener` or an `ApplicationEventListeningMessageProducer` (see [Receiving Spring Application Events](./event.html#appevent-inbound)) to capture these events, allowing analysis of the problem.

Starting with versions 4.0.7 and 4.1.3, `TcpConnectionServerExceptionEvent` instances are published whenever an unexpected exception occurs on a server socket (such as a `BindException` when the server socket is in use).
These events have a reference to the connection factory and the cause.

Starting with version 4.2, `TcpConnectionFailedCorrelationEvent` instances are published whenever an endpoint (inbound gateway or collaborating outbound channel adapter) receives a message that cannot be routed to a connection because the `ip_connectionId` header is invalid.
Outbound gateways also publish this event when a late reply is received (the sender thread has timed out).
The event contains the connection ID as well as an exception in the `cause` property, which contains the failed message.

Starting with version 4.3, a `TcpConnectionServerListeningEvent` is emitted when a server connection factory is started.
This is useful when the factory is configured to listen on port 0, meaning that the operating system chooses the port.
It can also be used instead of polling `isListening()`, if you need to wait before starting some other process that connects to the socket.

|   |To avoid delaying the listening thread from accepting connections, the event is published on a separate thread.|
|---|---------------------------------------------------------------------------------------------------------------|

Starting with version 4.3.2, a `TcpConnectionFailedEvent` is emitted whenever a client connection cannot be created.
The source of the event is the connection factory, which you can use to determine the host and port to which the connection could not be established.

691
### TCP Adapters
茶陵後's avatar
茶陵後 已提交
692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777

TCP inbound and outbound channel adapters that use connection factories [mentioned earlier](#tcp-events) are provided.
These adapters have two relevant attributes: `connection-factory` and `channel`.
The `connection-factory` attribute indicates which connection factory is to be used to manage connections for the adapter.
The `channel` attribute specifies the channel on which messages arrive at an outbound adapter and on which messages are placed by an inbound adapter.
While both inbound and outbound adapters can share a connection factory, server connection factories are always “owned” by an inbound adapter.
Client connection factories are always “owned” by an outbound adapter.
Only one adapter of each type may get a reference to a connection factory.
The following example shows how to define client and server TCP connection factories:

```
<bean id="javaSerializer"
      class="org.springframework.core.serializer.DefaultSerializer"/>
<bean id="javaDeserializer"
      class="org.springframework.core.serializer.DefaultDeserializer"/>

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    deserializer="javaDeserializer"
    serializer="javaSerializer"
    using-nio="true"
    single-use="true"/>

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="#{server.port}"
    single-use="true"
    so-timeout="10000"
    deserializer="javaDeserializer"
    serializer="javaSerializer"/>

<int:channel id="input" />

<int:channel id="replies">
    <int:queue/>
</int:channel>

<int-ip:tcp-outbound-channel-adapter id="outboundClient"
    channel="input"
    connection-factory="client"/>

<int-ip:tcp-inbound-channel-adapter id="inboundClient"
    channel="replies"
    connection-factory="client"/>

<int-ip:tcp-inbound-channel-adapter id="inboundServer"
    channel="loop"
    connection-factory="server"/>

<int-ip:tcp-outbound-channel-adapter id="outboundServer"
    channel="loop"
    connection-factory="server"/>

<int:channel id="loop"/>
```

Also see [Annotation-Based Configuration](#ip-annotation) and [Using the Java DSL for TCP Components](#ip-dsl).

In the preceding configuration, messages arriving in the `input` channel are serialized over connections created by `client` connection factory, received at the server, and placed on the `loop` channel.
Since `loop` is the input channel for `outboundServer`, the message is looped back over the same connection, received by `inboundClient`, and deposited in the `replies` channel.
Java serialization is used on the wire.

Normally, inbound adapters use a `type="server"` connection factory, which listens for incoming connection requests.
In some cases, you may want to establish the connection in reverse, such that the inbound adapter connects to an external server and then waits for inbound messages on that connection.

This topology is supported by setting `client-mode="true"` on the inbound adapter.
In this case, the connection factory must be of type `client` and must have `single-use` set to `false`.

Two additional attributes support this mechanism.
The `retry-interval` specifies (in milliseconds) how often the framework attempts to reconnect after a connection failure.
The `scheduler` supplies a `TaskScheduler` to schedule the connection attempts and to test that the connection is still active.

If you don’t provide a scheduler, the framework’s default [taskScheduler](./configuration.html#namespace-taskscheduler) bean is used.

For an outbound adapter, the connection is normally established when the first message is sent.
The `client-mode="true"` on an outbound adapter causes the connection to be established when the adapter is started.
By default, adapters are automatically started.
Again, the connection factory must be of type `client` and have `single-use="false"`.
A `retry-interval` and `scheduler` are also supported.
If a connection fails, it is re-established either by the scheduler or when the next message is sent.

For both inbound and outbound, if the adapter is started, you can force the adapter to establish a connection by sending a `<control-bus />` command: `@adapter_id.retryConnection()`.
Then you can examine the current state with `@adapter_id.isClientModeConnected()`.

778
### TCP Gateways
茶陵後's avatar
茶陵後 已提交
779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848

The inbound TCP gateway `TcpInboundGateway` and outbound TCP gateway `TcpOutboundGateway` use a server and client connection factory, respectively.
Each connection can process a single request or response at a time.

The inbound gateway, after constructing a message with the incoming payload and sending it to the `requestChannel`, waits for a response and sends the payload from the response message by writing it to the connection.

|   |For the inbound gateway, you must retain or populate, the `ip_connectionId` header, because it is used to correlate the message to a connection.<br/>Messages that originate at the gateway automatically have the header set.<br/>If the reply is constructed as a new message, you need to set the header.<br/>The header value can be captured from the incoming message.|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

As with inbound adapters, inbound gateways normally use a `type="server"` connection factory, which listens for incoming connection requests.
In some cases, you may want to establish the connection in reverse, such that the inbound gateway connects to an external server and then waits for and replies to inbound messages on that connection.

This topology is supported by using `client-mode="true"` on the inbound gateway.
In this case, the connection factory must be of type `client` and must have `single-use` set to `false`.

Two additional attributes support this mechanism.`retry-interval` specifies (in milliseconds) how often the framework tries to reconnect after a connection failure.`scheduler` supplies a `TaskScheduler` to schedule the connection attempts and to test that the connection is still active.

If the gateway is started, you may force the gateway to establish a connection by sending a `<control-bus/>` command: `@adapter_id.retryConnection()` and examine the current state with `@adapter_id.isClientModeConnected()`.

The outbound gateway, after sending a message over the connection, waits for a response, constructs a response message, and puts it on the reply channel.
Communications over the connections are single-threaded.
Only one message can be handled at a time.
If another thread attempts to send a message before the current response has been received, it blocks until any previous requests are complete (or time out).
If, however, the client connection factory is configured for single-use connections, each new request gets its own connection and is processed immediately.
The following example configures an inbound TCP gateway:

```
<int-ip:tcp-inbound-gateway id="inGateway"
    request-channel="tcpChannel"
    reply-channel="replyChannel"
    connection-factory="cfServer"
    reply-timeout="10000"/>
```

If a connection factory configured with the default serializer or deserializer is used, messages is `\r\n` delimited data and the gateway can be used by a simple client such as telnet.

The following example shows an outbound TCP gateway:

```
<int-ip:tcp-outbound-gateway id="outGateway"
    request-channel="tcpChannel"
    reply-channel="replyChannel"
    connection-factory="cfClient"
    request-timeout="10000"
    remote-timeout="10000"/> <!-- or e.g.
remote-timeout-expression="headers['timeout']" -->
```

`client-mode` is not currently available with the outbound gateway.

Starting with version 5.2, the outbound gateway can be configured with the property `closeStreamAfterSend`.
If the connection factory is configured for `single-use` (a new connection for each request/reply) the gateway will close the output stream; this signals EOF to the server.
This is useful if the server uses the EOF to determine the end of message, rather than some delimiter in the stream, but leaves the connection open in order to receive the reply.

Normally, the calling thread will block in the gateway, waiting for the reply (or a timeout).
Starting with version 5.3, you can set the `async` property on the gateway and the sending thread is released to do other work.
The reply (or error) will be sent on the receiving thread.
This only applies when using the `TcpNetClientConnectionFactory`, it is ignored when using NIO because there is a race condition whereby a socket error that occurs after the reply is received can be passed to the gateway before the reply.

|   |When using a shared connection (`singleUse=false`), a new request, while another is in process, will be blocked until the current reply is received.<br/>Consider using the `CachingClientConnectionFactory` if you wish to support concurrent requests on a pool of long-lived connections.|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

Starting with version 5.4, the inbound can be configured with an `unsolicitedMessageChannel`.
Unsolicited inbound messages will be sent to this channel, as well as late replies (where the client timed out).
To support this on the server side, you can now register multiple `TcpSender` s with the connection factory.
Gateways and Channel Adapters automatically register themselves.
When sending unsolicited messages from the server, you must add the appropriate `IpHeaders.CONNECTION_ID` to the messages sent.

Also see [Annotation-Based Configuration](#ip-annotation) and [Using the Java DSL for TCP Components](#ip-dsl).

849
### TCP Message Correlation
茶陵後's avatar
茶陵後 已提交
850 851 852 853 854 855 856 857

One goal of the IP endpoints is to provide communication with systems other than Spring Integration applications.
For this reason, only message payloads are sent and received by default.
Since 3.0, you can transfer headers by using JSON, Java serialization, or custom serializers and deserializers.
See [Transferring Headers](#ip-headers) for more information.
No message correlation is provided by the framework (except when using the gateways) or collaborating channel adapters on the server side.[Later in this document](#ip-collaborating-adapters), we discuss the various correlation techniques available to applications.
In most cases, this requires specific application-level correlation of messages, even when message payloads contain some natural correlation data (such as an order number).

858
#### Gateways
茶陵後's avatar
茶陵後 已提交
859 860 861 862 863 864 865 866 867 868 869 870 871

Gateways automatically correlate messages.
However, you should use an outbound gateway for relatively low-volume applications.
When you configure the connection factory to use a single shared connection for all message pairs ('single-use="false"'), only one message can be processed at a time.
A new message has to wait until the reply to the previous message has been received.
When a connection factory is configured for each new message to use a new connection ('single-use="true"'), this restriction does not apply.
While this setting can give higher throughput than a shared connection environment, it comes with the overhead of opening and closing a new connection for each message pair.

Therefore, for high-volume messages, consider using a collaborating pair of channel adapters.
However, to do so, you need to provide collaboration logic.

Another solution, introduced in Spring Integration 2.2, is to use a `CachingClientConnectionFactory`, which allows the use of a pool of shared connections.

872
#### Collaborating Outbound and Inbound Channel Adapters
茶陵後's avatar
茶陵後 已提交
873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901

To achieve high-volume throughput (avoiding the pitfalls of using gateways, as [mentioned earlier](#ip-gateways)) you can configure a pair of collaborating outbound and inbound channel adapters.
You can also use collaborating adapters (server-side or client-side) for totally asynchronous communication (rather than with request-reply semantics).
On the server side, message correlation is automatically handled by the adapters, because the inbound adapter adds a header that allows the outbound adapter to determine which connection to use when sending the reply message.

|   |On the server side, you must populate the `ip_connectionId` header, because it is used to correlate the message to a connection.<br/>Messages that originate at the inbound adapter automatically have the header set.<br/>If you wish to construct other messages to send, you need to set the header.<br/>You can get the header value from an incoming message.|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

On the client side, the application must provide its own correlation logic, if needed.
You can do so in a number of ways.

If the message payload has some natural correlation data (such as a transaction ID or an order number) and you have no need to retain any information (such as a reply channel header) from the original outbound message, the correlation is simple and would be done at the application level in any case.

If the message payload has some natural correlation data (such as a transaction ID or an order number), but you need to retain some information (such as a reply channel header) from the original outbound message, you can retain a copy of the original outbound message (perhaps by using a publish-subscribe channel) and use an aggregator to recombine the necessary data.

For either of the previous two scenarios, if the payload has no natural correlation data, you can provide a transformer upstream of the outbound channel adapter to enhance the payload with such data.
Such a transformer may transform the original payload to a new object that contains both the original payload and some subset of the message headers.
Of course, live objects (such as reply channels) from the headers cannot be included in the transformed payload.

If you choose such a strategy, you need to ensure the connection factory has an appropriate serializer-deserializer pair to handle such a payload (such as `DefaultSerializer` and `DefaultDeserializer`, which use java serialization, or a custom serializer and deserializer).
The `ByteArray*Serializer` options mentioned in [TCP Connection Factories](#tcp-connection-factories), including the default `ByteArrayCrLfSerializer`, do not support such payloads unless the transformed payload is a `String` or `byte[]`.

|   |Before the 2.2 release, when collaborating channel adapters used a client connection factory, the `so-timeout` attribute defaulted to the default reply timeout (10 seconds).<br/>This meant that, if no data were received by the inbound adapter for this period of time, the socket was closed.<br/><br/>This default behavior was not appropriate in a truly asynchronous environment, so it now defaults to an infinite timeout.<br/>You can reinstate the previous default behavior by setting the `so-timeout` attribute on the client connection factory to 10000 milliseconds.|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

Starting with version 5.4, multiple outbound channel adapters and one `TcpInboundChannelAdapter` can share the same connection factory.
This allows an application to support both request/reply and arbitrary server → client messaging.
See [TCP Gateways](#tcp-gateways) for more information.

902
#### Transferring Headers
茶陵後's avatar
茶陵後 已提交
903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966

TCP is a streaming protocol.`Serializers` and `Deserializers` demarcate messages within the stream.
Prior to 3.0, only message payloads (`String` or `byte[]`) could be transferred over TCP.
Beginning with 3.0, you can transfer selected headers as well as the payload.
However, “live” objects, such as the `replyChannel` header, cannot be serialized.

Sending header information over TCP requires some additional configuration.

The first step is to provide the `ConnectionFactory` with a `MessageConvertingTcpMessageMapper` that uses the `mapper` attribute.
This mapper delegates to any `MessageConverter` implementation to convert the message to and from some object that can be serialized and deserialized by the configured `serializer` and `deserializer`.

Spring Integration provides a `MapMessageConverter`, which allows the specification of a list of headers that are added to a `Map` object, along with the payload.
The generated Map has two entries: `payload` and `headers`.
The `headers` entry is itself a `Map` and contains the selected headers.

The second step is to provide a serializer and a deserializer that can convert between a `Map` and some wire format.
This can be a custom `Serializer` or `Deserializer`, which you typically need if the peer system is not a Spring Integration application.

Spring Integration provides a `MapJsonSerializer` to convert a `Map` to and from JSON.
It uses a Spring Integration `JsonObjectMapper`.
You can provide a custom `JsonObjectMapper` if needed.
By default, the serializer inserts a linefeed (`0x0a`) character between objects.
See the [Javadoc](https://docs.spring.io/spring-integration/api/org/springframework/integration/ip/tcp/serializer/MapJsonSerializer.html) for more information.

|   |The `JsonObjectMapper` uses whichever version of `Jackson` is on the classpath.|
|---|-------------------------------------------------------------------------------|

You can also use standard Java serialization of the `Map`, by using the `DefaultSerializer` and `DefaultDeserializer`.

The following example shows the configuration of a connection factory that transfers the `correlationId`, `sequenceNumber`, and `sequenceSize` headers by using JSON:

```
<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="12345"
    mapper="mapper"
    serializer="jsonSerializer"
    deserializer="jsonSerializer"/>

<bean id="mapper"
      class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
    <constructor-arg name="messageConverter">
        <bean class="o.sf.integration.support.converter.MapMessageConverter">
            <property name="headerNames">
                <list>
                    <value>correlationId</value>
                    <value>sequenceNumber</value>
                    <value>sequenceSize</value>
                </list>
            </property>
        </bean>
    </constructor-arg>
</bean>

<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />
```

A message sent with the preceding configuration, with a payload of 'something' would appear on the wire as follows:

```
{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}
```

967
### 
茶陵後's avatar
茶陵後 已提交
968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990

Using NIO (see `using-nio` in [IP Configuration Attributes](#ip-endpoint-reference)) avoids dedicating a thread to read from each socket.
For a small number of sockets, you are likely to find that not using NIO, together with an asynchronous hand-off (such as to a `QueueChannel`), performs as well as or better than using NIO.

You should consider using NIO when handling a large number of connections.
However, the use of NIO has some other ramifications.
A pool of threads (in the task executor) is shared across all the sockets.
Each incoming message is assembled and sent to the configured channel as a separate unit of work on a thread selected from that pool.
Two sequential messages arriving on the same socket might be processed by different threads.
This means that the order in which the messages are sent to the channel is indeterminate.
Strict ordering of the messages arriving on the socket is not maintained.

For some applications, this is not an issue.
For others, it is a problem.
If you require strict ordering, consider setting `using-nio` to `false` and using an asynchronous hand-off.

Alternatively, you can insert a resequencer downstream of the inbound endpoint to return the messages to their proper sequence.
If you set `apply-sequence` to `true` on the connection factory, messages arriving on a TCP connection have `sequenceNumber` and `correlationId` headers set.
The resequencer uses these headers to return the messages to their proper sequence.

|   |Starting with version 5.1.4, priority is given to accepting new connections over reading from existing connections.<br/>This should, generally, have little impact unless you have a very high rate of new incoming connections.<br/>If you wish to revert to the previous behavior of giving reads priority, set the `multiAccept` property on the `TcpNioServerConnectionFactory` to `false`.|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

991
#### Pool Size
茶陵後's avatar
茶陵後 已提交
992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008

The pool size attribute is no longer used.
Previously, it specified the size of the default thread pool when a task-executor was not specified.
It was also used to set the connection backlog on server sockets.
The first function is no longer needed (see the next paragraph).
The second function is replaced by the `backlog` attribute.

Previously, when using a fixed thread pool task executor (which was the default) with NIO, it was possible to get a deadlock and processing would stop.
The problem occurred when a buffer was full, a thread reading from the socket was trying to add more data to the buffer, and no threads were available to make space in the buffer.
This only occurred with a very small pool size, but it could be possible under extreme conditions.
Since 2.2, two changes have eliminated this problem.
First, the default task executor is a cached thread pool executor.
Second, deadlock detection logic has been added such that, if thread starvation occurs, instead of deadlocking, an exception is thrown, thus releasing the deadlocked resources.

|   |Now that the default task executor is unbounded, it is possible that an out-of-memory condition might occur with high rates of incoming messages, if message processing takes extended time.<br/>If your application exhibits this type of behavior, you should use a pooled task executor with an appropriate pool size, but see [the next section](#io-thread-pool-task-executor-caller-runs).|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

1009
#### Thread Pool Task Executor with `CALLER_RUNS` Policy
茶陵後's avatar
茶陵後 已提交
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100

You should keep in mind some important considerations when you use a fixed thread pool with the `CallerRunsPolicy` (`CALLER_RUNS` when using the `<task/>` namespace) and the queue capacity is small.

The following does not apply if you do not use a fixed thread pool.

With NIO connections, there are three distinct task types.
The I/O selector processing is performed on one dedicated thread (detecting events, accepting new connections, and dispatching the I/O read operations to other threads by using the task executor).
When an I/O reader thread (to which the read operation is dispatched) reads data, it hands off to another thread to assemble the incoming message.
Large messages can take several reads to complete.
These “assembler” threads can block while waiting for data.
When a new read event occurs, the reader determines if this socket already has an assembler and, if not, runs a new one.
When the assembly process is complete, the assembler thread is returned to the pool.

This can cause a deadlock when the pool is exhausted, the `CALLER_RUNS` rejection policy is in use, and the task queue is full.
When the pool is empty and there is no room in the queue, the IO selector thread receives an `OP_READ` event and dispatches the read by using the executor.
The queue is full, so the selector thread itself starts the read process.
Now it detects that there is no assembler for this socket and, before it does the read, fires off an assembler.
Again, the queue is full, and the selector thread becomes the assembler.
The assembler is now blocked, waiting for the data to be read, which never happens.
The connection factory is now deadlocked because the selector thread cannot handle new events.

To avoid this deadlock, we must avoid the selector (or reader) threads performing the assembly task.
We want to use separate pools for the IO and assembly operations.

The framework provides a `CompositeExecutor`, which allows the configuration of two distinct executors: one for performing IO operations and one for message assembly.
In this environment, an IO thread can never become an assembler thread, and the deadlock cannot occur.

In addition, the task executors should be configured to use an `AbortPolicy` (`ABORT` when using `<task>`).
When an I/O task cannot be completed, it is deferred for a short time and continually retried until it can be completed and have an assembler allocated.
By default, the delay is 100ms, but you can change it by setting the `readDelay` property on the connection factory (`read-delay` when configuring with the XML namespace).

The following three examples shows how to configure the composite executor:

```
@Bean
private CompositeExecutor compositeExecutor() {
    ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
    ioExec.setCorePoolSize(4);
    ioExec.setMaxPoolSize(10);
    ioExec.setQueueCapacity(0);
    ioExec.setThreadNamePrefix("io-");
    ioExec.setRejectedExecutionHandler(new AbortPolicy());
    ioExec.initialize();
    ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
    assemblerExec.setCorePoolSize(4);
    assemblerExec.setMaxPoolSize(10);
    assemblerExec.setQueueCapacity(0);
    assemblerExec.setThreadNamePrefix("assembler-");
    assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
    assemblerExec.initialize();
    return new CompositeExecutor(ioExec, assemblerExec);
}
```

```
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg ref="io"/>
    <constructor-arg ref="assembler"/>
</bean>

<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
```

```
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="io-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="8" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="assembler-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="10" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
</bean>
```

1101
### SSL/TLS Support
茶陵後's avatar
茶陵後 已提交
1102 1103 1104 1105 1106 1107 1108

Secure Sockets Layer/Transport Layer Security is supported.
When using NIO, the JDK 5+ `SSLEngine` feature is used to handle handshaking after the connection is established.
When not using NIO, standard `SSLSocketFactory` and `SSLServerSocketFactory` objects are used to create connections.
A number of strategy interfaces are provided to allow significant customization.
The default implementations of these interfaces provide for the simplest way to get started with secure communications.

1109
#### Getting Started
茶陵後's avatar
茶陵後 已提交
1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161

Regardless of whether you use NIO, you need to configure the `ssl-context-support` attribute on the connection factory.
This attribute references a \<bean/\> definition that describes the location and passwords for the required key stores.

SSL/TLS peers require two key stores each:

* A keystore that contains private and public key pairs to identify the peer

* A truststore that contains the public keys for peers that are trusted.
  See the documentation for the `keytool` utility provided with the JDK.
  The essential steps are

  1. Create a new key pair and store it in a keystore.

  2. Export the public key.

  3. Import the public key into the peer’s truststore.

  4. Repeat for the other peer.

|   |It is common in test cases to use the same key stores on both peers, but this should be avoided for production.|
|---|---------------------------------------------------------------------------------------------------------------|

After establishing the key stores, the next step is to indicate their locations to the `TcpSSLContextSupport` bean and provide a reference to that bean to the connection factory.

The following example configures an SSL connection:

```
<bean id="sslContextSupport"
    class="o.sf.integration.ip.tcp.connection.support.DefaultTcpSSLContextSupport">
    <constructor-arg value="client.ks"/>
    <constructor-arg value="client.truststore.ks"/>
    <constructor-arg value="secret"/>
    <constructor-arg value="secret"/>
</bean>

<ip:tcp-connection-factory id="clientFactory"
    type="client"
    host="localhost"
    port="1234"
    ssl-context-support="sslContextSupport" />
```

The `DefaultTcpSSLContextSupport` class also has an optional `protocol` property, which can be `SSL` or `TLS` (the default).

The keystore file names (the first two constructor arguments) use the Spring `Resource` abstraction.
By default, the files are located on the classpath, but you can override this by using the `file:` prefix (to find the files on the filesystem instead).

Starting with version 4.3.6, when you use NIO, you can specify an `ssl-handshake-timeout` (in seconds) on the connection factory.
This timeout (the default is 30 seconds) is used during SSL handshake when waiting for data.
If the timeout is exceeded, the process is stopped and the socket is closed.

1162
#### Host Verification
茶陵後's avatar
茶陵後 已提交
1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193

Starting with version 5.0.8, you can configure whether or not to enable host verification.
Starting with version 5.1, it is enabled by default; the mechanism to disable it depends on whether or not you are using NIO.

Host verification is used to ensure the server you are connected to matches information in the certificate, even if the certificate is trusted.

When using NIO, configure the `DefaultTcpNioSSLConnectionSupport`, for example.

```
@Bean
public DefaultTcpNioSSLConnectionSupport connectionSupport() {
    DefaultTcpSSLContextSupport sslContextSupport = new DefaultTcpSSLContextSupport("test.ks",
            "test.truststore.ks", "secret", "secret");
    sslContextSupport.setProtocol("SSL");
    DefaultTcpNioSSLConnectionSupport tcpNioConnectionSupport =
            new DefaultTcpNioSSLConnectionSupport(sslContextSupport, false);
    return tcpNioConnectionSupport;
}
```

The second constructor argument disables host verification.
The `connectionSupport` bean is then injected into the NIO connection factory.

When not using NIO, the configuration is in the `TcpSocketSupport`:

```
connectionFactory.setTcpSocketSupport(new DefaultTcpSocketSupport(false));
```

Again, the constructor argument disables host verification.

1194
### Advanced Techniques
茶陵後's avatar
茶陵後 已提交
1195 1196 1197

This section covers advanced techniques that you may find to be helpful in certain situations.

1198
#### Strategy Interfaces
茶陵後's avatar
茶陵後 已提交
1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212

In many cases, the configuration described earlier is all that is needed to enable secure communication over TCP/IP.
However, Spring Integration provides a number of strategy interfaces to allow customization and modification of socket factories and sockets:

* `TcpSSLContextSupport`

* `TcpSocketFactorySupport`

* `TcpSocketSupport`

* `TcpNetConnectionSupport`

* `TcpNioConnectionSupport`

1213
##### The `TcpSSLContextSupport` Strategy Interface
茶陵後's avatar
茶陵後 已提交
1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228

The following listing shows the `TcpSSLContextSupport` strategy interface:

```
public interface TcpSSLContextSupport {

    SSLContext getSSLContext() throws Exception;

}
```

Implementations of the `TcpSSLContextSupport` interface are responsible for creating an `SSLContext` object.
The implementation provided by the framework is the `DefaultTcpSSLContextSupport`, [described earlier](#ip-ssl-tls-getting-started).
If you require different behavior, implement this interface and provide the connection factory with a reference to a bean of your class' implementation.

1229
##### The `TcpSocketFactorySupport` Strategy Interface
茶陵後's avatar
茶陵後 已提交
1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253

The following listing shows the definition of the `TcpSocketFactorySupport` strategy interface:

```
public interface TcpSocketFactorySupport {

    ServerSocketFactory getServerSocketFactory();

    SocketFactory getSocketFactory();

}
```

Implementations of this interface are responsible for obtaining references to `ServerSocketFactory` and `SocketFactory`.
Two implementations are provided.
The first is `DefaultTcpNetSocketFactorySupport` for non-SSL sockets (when no `ssl-context-support` attribute is defined).
This uses the JDK’s default factories.
The second implementation is `DefaultTcpNetSSLSocketFactorySupport`.
By default, this is used when an `ssl-context-support` attribute is defined.
It uses the `SSLContext` created by that bean to create the socket factories.

|   |This interface applies only if `using-nio` is `false`.<br/>NIO does not use socket factories.|
|---|---------------------------------------------------------------------------------------------|

1254
##### The `TcpSocketSupport` Strategy Interface
茶陵後's avatar
茶陵後 已提交
1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274

The following listing shows the definition of the `TcpSocketSupport` strategy interface:

```
public interface TcpSocketSupport {

    void postProcessServerSocket(ServerSocket serverSocket);

    void postProcessSocket(Socket socket);

}
```

Implementations of this interface can modify sockets after they are created and after all configured attributes have been applied but before the sockets are used.
This applies whether you use NIO or not.
For example, you could use an implementation of this interface to modify the supported cipher suites on an SSL socket, or you could add a listener that gets notified after SSL handshaking is complete.
The sole implementation provided by the framework is the `DefaultTcpSocketSupport`, which does not modify the sockets in any way.

To supply your own implementation of `TcpSocketFactorySupport` or `TcpSocketSupport`, provide the connection factory with references to beans of your custom type by setting the `socket-factory-support` and `socket-support` attributes, respectively.

1275
##### The `TcpNetConnectionSupport` Strategy Interface
茶陵後's avatar
茶陵後 已提交
1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325

The following listing shows the definition of the `TcpNetConnectionSupport` strategy interface:

```
public interface TcpNetConnectionSupport {

    TcpNetConnection createNewConnection(Socket socket,
            boolean server, boolean lookupHost,
            ApplicationEventPublisher applicationEventPublisher,
            String connectionFactoryName) throws Exception;

}
```

This interface is invoked to create objects of type `TcpNetConnection` (or its subclasses).
The framework provides a single implementation (`DefaultTcpNetConnectionSupport`), which, by default, creates simple `TcpNetConnection` objects.
It has two properties: `pushbackCapable` and `pushbackBufferSize`.
When push back is enabled, the implementation returns a subclass that wraps the connection’s `InputStream` in a `PushbackInputStream`.
Aligned with the `PushbackInputStream` default, the buffer size defaults to 1.
This lets deserializers “unread” (push back) bytes into the stream.
The following trivial example shows how it might be used in a delegating deserializer that “peeks” at the first byte to determine which deserializer to invoke:

```
public class CompositeDeserializer implements Deserializer<byte[]> {

    private final ByteArrayStxEtxSerializer stxEtx = new ByteArrayStxEtxSerializer();

    private final ByteArrayCrLfSerializer crlf = new ByteArrayCrLfSerializer();

    @Override
    public byte[] deserialize(InputStream inputStream) throws IOException {
        PushbackInputStream pbis = (PushbackInputStream) inputStream;
        int first = pbis.read();
        if (first < 0) {
            throw new SoftEndOfStreamException();
        }
        pbis.unread(first);
        if (first == ByteArrayStxEtxSerializer.STX) {
            this.receivedStxEtx = true;
            return this.stxEtx.deserialize(pbis);
        }
        else {
            this.receivedCrLf = true;
            return this.crlf.deserialize(pbis);
        }
    }

}
```

1326
##### The `TcpNioConnectionSupport` Strategy Interface
茶陵後's avatar
茶陵後 已提交
1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347

The following listing shows the definition of the `TcpNioConnectionSupport` strategy interface:

```
public interface TcpNioConnectionSupport {

    TcpNioConnection createNewConnection(SocketChannel socketChannel,
            boolean server, boolean lookupHost,
            ApplicationEventPublisher applicationEventPublisher,
            String connectionFactoryName) throws Exception;

}
```

This interface is invoked to create `TcpNioConnection` objects (or objects from subclasses).
Spring Integration provides two implementations: `DefaultTcpNioSSLConnectionSupport` and `DefaultTcpNioConnectionSupport`.
Which one is used depends on whether SSL is in use.
A common use case is to subclass `DefaultTcpNioSSLConnectionSupport` and override `postProcessSSLEngine`.
See the [SSL client authentication example](#ssl-client-authentication-example).
As with the `DefaultTcpNetConnectionSupport`, these implementations also support push back.

1348
#### Example: Enabling SSL Client Authentication
茶陵後's avatar
茶陵後 已提交
1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390

To enable client certificate authentication when you use SSL, the technique depends on whether you use NIO.
When you do not NIO , provide a custom `TcpSocketSupport` implementation to post-process the server socket:

```
serverFactory.setTcpSocketSupport(new DefaultTcpSocketSupport() {

    @Override
    public void postProcessServerSocket(ServerSocket serverSocket) {
        ((SSLServerSocket) serverSocket).setNeedClientAuth(true);
    }

});
```

(When you use XML configuration, provide a reference to your bean by setting the `socket-support` attribute).

When you use NIO, provide a custom `TcpNioSslConnectionSupport` implementation to post-process the `SSLEngine`, as the following example shows:

```
@Bean
public DefaultTcpNioSSLConnectionSupport tcpNioConnectionSupport() {
    return new DefaultTcpNioSSLConnectionSupport(serverSslContextSupport) {

            @Override
            protected void postProcessSSLEngine(SSLEngine sslEngine) {
                sslEngine.setNeedClientAuth(true);
            }

    }
}

@Bean
public TcpNioServerConnectionFactory server() {
    ...
    serverFactory.setTcpNioConnectionSupport(tcpNioConnectionSupport());
    ...
}
```

(When you use XML configuration, since version 4.3.7, provide a reference to your bean by setting the `nio-connection-support` attribute).

1391
### IP Configuration Attributes
茶陵後's avatar
茶陵後 已提交
1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512

The following table describes attributes that you can set to configure IP connections:

|      Attribute Name       |Client?|Server?|Allowed Values |                                                                                                                    Attribute Description                                                                                                                    |
|---------------------------|-------|-------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|          `type`           |   Y   |   Y   |client, server |                                                                                             Determines whether the connection factory is a client or a server.                                                                                              |
|          `host`           |   Y   |   N   |               |                                                                                                       The host name or IP address of the destination.                                                                                                       |
|          `port`           |   Y   |   Y   |               |                                                                                                                          The port.                                                                                                                          |
|       `serializer`        |   Y   |   Y   |               |                                                                         An implementation of `Serializer` used to serialize the payload.<br/>Defaults to `ByteArrayCrLfSerializer`                                                                          |
|      `deserializer`       |   Y   |   Y   |               |                                                                       An implementation of `Deserializer` used to deserialize the payload.<br/>Defaults to `ByteArrayCrLfSerializer`                                                                        |
|        `using-nio`        |   Y   |   Y   |`true`, `false`|                                           Whether or not connection uses NIO.<br/>Refer to the `java.nio` package for more information.<br/>See [About Non-blocking I/O (NIO)](#note-nio).<br/>Default: `false`.                                            |
|  `using-direct-buffers`   |   Y   |   N   |`true`, `false`|                                When using NIO, whether or not the connection uses direct buffers.<br/>Refer to the `java.nio.ByteBuffer` documentation for more information.<br/>Must be `false` if `using-nio` is `false`.                                 |
|     `apply-sequence`      |   Y   |   Y   |`true`, `false`|When you use NIO, it may be necessary to resequence messages.<br/>When this attribute is set to `true`, `correlationId` and `sequenceNumber` headers are added to received messages.<br/>See [About Non-blocking I/O (NIO)](#note-nio).<br/>Default: `false`.|
|       `so-timeout`        |   Y   |   Y   |               |                                            Defaults to `0` (infinity), except for server connection factories with `single-use="true"`.<br/>In that case, it defaults to the default reply timeout (10 seconds).                                            |
|   `so-send-buffer-size`   |   Y   |   Y   |               |                                                                                                        See `java.net.Socket.``setSendBufferSize()`.                                                                                                         |
| `so-receive-buffer-size`  |   Y   |   Y   |               |                                                                                                       See `java.net.Socket.``setReceiveBufferSize()`.                                                                                                       |
|      `so-keep-alive`      |   Y   |   Y   |`true`, `false`|                                                                                                            See `java.net.Socket.setKeepAlive()`.                                                                                                            |
|        `so-linger`        |   Y   |   Y   |               |                                                                                  Sets `linger` to `true` with the supplied value.<br/>See `java.net.Socket.setSoLinger()`.                                                                                  |
|     `so-tcp-no-delay`     |   Y   |   Y   |`true`, `false`|                                                                                                           See `java.net.Socket.setTcpNoDelay()`.                                                                                                            |
|    `so-traffic-class`     |   Y   |   Y   |               |                                                                                                         See `java.net.Socket.``setTrafficClass()`.                                                                                                          |
|      `local-address`      |   N   |   Y   |               |                                                                              On a multi-homed system, specifies an IP address for the interface to which the socket is bound.                                                                               |
|      `task-executor`      |   Y   |   Y   |               |      Specifies a specific executor to be used for socket handling.<br/>If not supplied, an internal cached thread executor is used.<br/>Needed on some platforms that require the use of specific task executors, such as a `WorkManagerTaskExecutor`.      |
|       `single-use`        |   Y   |   Y   |`true`, `false`|                                                                 Specifies whether a connection can be used for multiple messages.<br/>If `true`, a new connection is used for each message.                                                                 |
|        `pool-size`        |   N   |   N   |               |                                         This attribute is no longer used.<br/>For backward compatibility, it sets the backlog, but you should use `backlog` to specify the connection backlog in server factories.                                          |
|         `backlog`         |   N   |   Y   |               |                                                                                                      Sets the connection backlog for server factories.                                                                                                      |
|       `lookup-host`       |   Y   |   Y   |`true`, `false`|                                     Specifies whether reverse lookups are done on IP addresses to convert to host names for use in message headers.<br/>If false, the IP address is used instead.<br/>Default: `true`.                                      |
|`interceptor-factory-chain`|   Y   |   Y   |               |                                                                                                    See [TCP Connection Interceptors](#ip-interceptors).                                                                                                     |
|   `ssl-context-support`   |   Y   |   Y   |               |                                                                                                             See `[SSL/TLS Support](#ssl-tls)`.                                                                                                              |
| `socket-factory-support`  |   Y   |   Y   |               |                                                                                                             See `[SSL/TLS Support](#ssl-tls)`.                                                                                                              |
|     `socket-support`      |   Y   |   Y   |               |                                                                                                              See [SSL/TLS Support](#ssl-tls).                                                                                                               |
| `nio-connection-support`  |   Y   |   Y   |               |                                                                                                    See [Advanced Techniques](#tcp-advanced-techniques).                                                                                                     |
|       `read-delay`        |   Y   |   Y   |   long \> 0   |                                       The delay (in milliseconds) before retrying a read after the previous attempt failed due to insufficient threads.<br/>Default: 100.<br/>Only applies if `using-nio` is `true`.                                        |

The following table describes attributes that you can set to configure UDP inbound channel adapters:

|     Attribute Name     |Allowed Values |                                                                                                                               Attribute Description                                                                                                                                |
|------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|         `port`         |               |                                                                                                                       The port on which the adapter listens.                                                                                                                       |
|      `multicast`       |`true`, `false`|                                                                                                                   Whether or not the UDP adapter uses multicast.                                                                                                                   |
|  `multicast-address`   |               |                                                                                                     When multicast is true, the multicast address to which the adapter joins.                                                                                                      |
|      `pool-size`       |               |                                                                          Specifies how many packets can be handled concurrently.<br/>It only applies if task-executor is not configured.<br/>Default: 5.                                                                           |
|     task-executor      |               |Specifies a specific executor to be used for socket handling.<br/>If not supplied, an internal pooled executor is used.<br/>Needed on some platforms that require the use of specific task executors such as a `WorkManagerTaskExecutor`.<br/>See pool-size for thread requirements.|
| `receive-buffer-size`  |               |   The size of the buffer used to receive `DatagramPackets`.<br/>Usually set to the maximum transmission unit (MTU) size.<br/>If a smaller buffer is used than the size of the sent packet, truncation can occur.<br/>You can detect this by using the `check-length` attribute..   |
|     `check-length`     |`true`, `false`|                                                                               Whether or not a UDP adapter expects a data length field in the packet received.<br/>Used to detect packet truncation.                                                                               |
|      `so-timeout`      |               |                                                                                                See the `setSoTimeout()` methods in `java.net.DatagramSocket` for more information.                                                                                                 |
| `so-send-buffer-size`  |               |                                                                          Used for UDP acknowledgment packets.<br/>See the setSendBufferSize() methods in `java.net.DatagramSocket` for more information.                                                                           |
|`so-receive-buffer-size`|               |                                                                                                     See `java.net.DatagramSocket.setReceiveBufferSize()` for more information.                                                                                                     |
|    `local-address`     |               |                                                                                          On a multi-homed system, specifies an IP address for the interface to which the socket is bound.                                                                                          |
|    `error-channel`     |               |                                                              If a downstream component throws an exception, the `MessagingException` message that contains the exception and failed message is sent to this channel.                                                               |
|     `lookup-host`      |`true`, `false`|                                                Specifies whether reverse lookups are done on IP addresses to convert to host names for use in message headers.<br/>If `false`, the IP address is used instead.<br/>Default: `true`.                                                |

The following table describes attributes that you can set to configure UDP outbound channel adapters:

|     Attribute Name     |Allowed Values |                                                                                                                                                                  Attribute Description                                                                                                                                                                   |
|------------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|         `host`         |               |                                                                                                                          The host name or ip address of the destination.<br/>For multicast udp adapters, the multicast address.                                                                                                                          |
|         `port`         |               |                                                                                                                                                               The port on the destination.                                                                                                                                                               |
|      `multicast`       |`true`, `false`|                                                                                                                                                      Whether or not the udp adapter uses multicast.                                                                                                                                                      |
|     `acknowledge`      |`true`, `false`|                                                                  Whether or not a UDP adapter requires an acknowledgment from the destination.<br/>When enabled, it requires setting the following four attributes: `ack-host`, `ack-port`, `ack-timeout`, and `min-acks-for- success`.                                                                  |
|       `ack-host`       |               |                                                            When `acknowledge` is `true`, indicates the host or IP address to which the acknowledgment should be sent.<br/>Usually the current host, but may be different — for example, when Network Address Translation (NAT) is being used.                                                            |
|       `ack-port`       |               |                                                                                                  When `acknowledge` is `true`, indicates the port to which the acknowledgment should be sent.<br/>The adapter listens on this port for acknowledgments.                                                                                                  |
|     `ack-timeout`      |               |                                                                            When `acknowledge` is `true`, indicates the time in milliseconds that the adapter waits for an acknowledgment.<br/>If an acknowledgment is not received in time, the adapter throws an exception.                                                                             |
|`min-acks-for- success` |               |                                                                                                        Defaults to 1.<br/>For multicast adapters, you can set this to a larger value, which requires acknowledgments from multiple destinations.                                                                                                         |
|     `check-length`     |`true`, `false`|                                                                                                                             Whether or not a UDP adapter includes a data length field in the packet sent to the destination.                                                                                                                             |
|     `time-to-live`     |               |                                                                             For multicast adapters, specifies the time-to-live attribute for the `MulticastSocket`.<br/>Controls the scope of the multicasts.<br/>Refer to the Java API documentation for more information.                                                                              |
|      `so-timeout`      |               |                                                                                                                                        See `java.net.DatagramSocket` setSoTimeout() methods for more information.                                                                                                                                        |
| `so-send-buffer-size`  |               |                                                                                                                                 See the `setSendBufferSize()` methods in `java.net.DatagramSocket` for more information.                                                                                                                                 |
|`so-receive-buffer-size`|               |                                                                                                           Used for UDP acknowledgment packets.<br/>See the `setReceiveBufferSize()` methods in `java.net.DatagramSocket` for more information.                                                                                                           |
|     local-address      |               |                                                      On a multi-homed system, for the UDP adapter, specifies an IP address for the interface to which the socket is bound for reply messages.<br/>For a multicast adapter, it also determines which interface the multicast packets are sent over.                                                       |
|    `task-executor`     |               |Specifies a specific executor to be used for acknowledgment handling.<br/>If not supplied, an internal single threaded executor is used.<br/>Needed on some platforms that require the use of specific task executors, such as a `WorkManagerTaskExecutor`.<br/>One thread is dedicated to handling acknowledgments (if the `acknowledge` option is true).|
|`destination-expression`|SpEL expression|                                                                                                            A SpEL expression to be evaluated to determine which `SocketAddress` to use as a destination address for the outgoing UDP packets.                                                                                                            |
|  `socket-expression`   |SpEL expression|                                                                                                                        A SpEL expression to be evaluated to determine which datagram socket use for sending outgoing UDP packets.                                                                                                                        |

The following table describes attributes that you can set to configure TCP inbound channel adapters:

|   Attribute Name   |Allowed Values |                                                                                                                                                Attribute Description                                                                                                                                                |
|--------------------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|     `channel`      |               |                                                                                                                                   The channel to which inbound messages is sent.                                                                                                                                    |
|`connection-factory`|               |                    If the connection factory has a type of `server`, the factory is “owned” by this adapter.<br/>If it has a type of `client`, it is “owned” by an outbound channel adapter, and this adapter receives any incoming messages on the connection created by the outbound adapter.                     |
|  `error-channel`   |               |                                                                           If an exception is thrown by a downstream component, the `MessagingException` message containing the exception and the failed message is sent to this channel.                                                                            |
|   `client-mode`    |`true`, `false`|When `true`, the inbound adapter acts as a client with respect to establishing the connection and then receiving incoming messages on that connection.<br/>Default: `false`.<br/>See also `retry-interval` and `scheduler`.<br/>The connection factory must be of type `client` and have `single-use` set to `false`.|
|  `retry-interval`  |               |                                                                           When in `client-mode`, specifies the number of milliseconds to wait between connection attempts or after a connection failure.<br/>Default: 60000 (60 seconds).                                                                           |
|    `scheduler`     |`true`, `false`|         Specifies a `TaskScheduler` to use for managing the `client-mode` connection.<br/>If not specified, it defaults to the global Spring Integration `taskScheduler` bean, which has a default pool size of 10.<br/>See [Configuring the Task Scheduler](./configuration.html#namespace-taskscheduler).         |

The following table describes attributes that you can set to configure TCP outbound channel adapters:

|   Attribute Name   |Allowed Values |                                                                                                                                                           Attribute Description                                                                                                                                                            |
|--------------------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|     `channel`      |               |                                                                                                                                               The channel on which outbound messages arrive.                                                                                                                                               |
|`connection-factory`|               |                         If the connection factory has a type of `client`, the factory is “owned” by this adapter.<br/>If it has a type of `server`, it is “owned” by an inbound channel adapter, and this adapter tries to correlate messages to the connection on which an original inbound message was received.                         |
|   `client-mode`    |`true`, `false`|When `true`, the outbound adapter tries to establish the connection as soon as it is started.<br/>When `false`, the connection is established when the first message is sent.<br/>Default: `false`.<br/>See also `retry-interval` and `scheduler`.<br/>The connection factory must be of type `client` and have `single-use` set to `false`.|
|  `retry-interval`  |               |                                                                                      When in `client-mode`, specifies the number of milliseconds to wait between connection attempts or after a connection failure.<br/>Default: 60000 (60 seconds).                                                                                       |
|    `scheduler`     |`true`, `false`|                    Specifies a `TaskScheduler` to use for managing the `client-mode` connection.<br/>If not specified, it defaults to the global Spring Integration `taskScheduler` bean, which has a default pool size of 10.<br/>See [Configuring the Task Scheduler](./configuration.html#namespace-taskscheduler).                     |

The following table describes attributes that you can set to configure TCP inbound gateways:

|   Attribute Name   |Allowed Values |                                                                                                                                                        Attribute Description                                                                                                                                                        |
|--------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|`connection-factory`|               |                                                                                                                                           The connection factory must be of type server.                                                                                                                                            |
| `request-channel`  |               |                                                                                                                                          The channel to which incoming messages are sent.                                                                                                                                           |
|  `reply-channel`   |               |                                                                                            The channel on which reply messages may arrive.<br/>Usually, replies arrive on a temporary reply channel added to the inbound message header.                                                                                            |
|  `reply-timeout`   |               |                                                                                                                   The time in milliseconds for which the gateway waits for a reply.<br/>Default: 1000 (1 second).                                                                                                                   |
|  `error-channel`   |               |                                             If an exception is thrown by a downstream component, the `MessagingException` message containing the exception and the failed message is sent to this channel.<br/>Any reply from that flow is then returned as a response by the gateway.                                              |
|   `client-mode`    |`true`, `false`|When `true`, the inbound gateway acts as a client with respect to establishing the connection and then receiving (and replying to) incoming messages on that connection.<br/>Default: false.<br/>See also `retry-interval` and `scheduler`.<br/>The connection factory must be of type `client` and have `single-use` set to `false`.|
|  `retry-interval`  |               |                                                                                   When in `client-mode`, specifies the number of milliseconds to wait between connection attempts or after a connection failure.<br/>Default: 60000 (60 seconds).                                                                                   |
|    `scheduler`     |`true`, `false`|                 Specifies a `TaskScheduler` to use for managing the `client-mode` connection.<br/>If not specified, it defaults to the global Spring Integration `taskScheduler` bean, which has a default pool size of 10.<br/>See [Configuring the Task Scheduler](./configuration.html#namespace-taskscheduler).                 |

The following table describes attributes that you can set to configure TCP outbound gateways:

|       Attribute Name        |Allowed Values|                                                                                                                         Attribute Description                                                                                                                         |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|    `connection-factory`     |              |                                                                                                           The connection factory must be of type `client`.                                                                                                            |
|      `request-channel`      |              |                                                                                                            The channel on which outgoing messages arrive.                                                                                                             |
|       `reply-channel`       |              |                                                                                                      Optional.<br/>The channel to which reply messages are sent.                                                                                                      |
|      `remote-timeout`       |              |The time in milliseconds for which the gateway waits for a reply from the remote system.<br/>Mutually exclusive with `remote-timeout-expression`.<br/>Default: 10000 (10 seconds).<br/>Note: In versions prior to 4.2 this value defaulted to `reply-timeout` (if set).|
| `remote-timeout-expression` |              |                              A SpEL expression that is evaluated against the message to determine the time in milliseconds for which the gateway waits for a reply from the remote system.<br/>Mutually exclusive with `remote-timeout`.                              |
|      `request-timeout`      |              |                                                          If a single-use connection factory is not being used, the time in milliseconds for which the gateway waits to get access to the shared connection.                                                           |
|       `reply-timeout`       |              |                             The time in milliseconds for which the gateway waits when sending the reply to the reply-channel.<br/>Only applies if the reply-channel might block (such as a bounded QueueChannel that is currently full).                              |
|           `async`           |              |                                                                                 Release the sending thread after the send; the reply (or error) will be sent on the receiving thread.                                                                                 |
|`unsolicited``MessageChannel`|              |                                                                                                   A channel to which to send unsolicited messages and late replies.                                                                                                   |

1513
### IP Message Headers
茶陵後's avatar
茶陵後 已提交
1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542

IP Message Headers

This module uses the following `MessageHeader` instances:

|      Header Name      |      IpHeaders Constant       |                                                                                                                                         Description                                                                                                                                         |
|-----------------------|-------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|     `ip_hostname`     |          `HOSTNAME`           |                                                                              The host name from which a TCP message or UDP packet was received.<br/>If `lookupHost` is `false`, this contains the IP address.                                                                               |
|     `ip_address`      |         `IP_ADDRESS`          |                                                                                                             The IP address from which a TCP message or UDP packet was received.                                                                                                             |
|       `ip_port`       |            `PORT`             |                                                                                                                              The remote port for a UDP packet.                                                                                                                              |
| ip\_localInetAddress  |      `IP_LOCAL_ADDRESS`       |                                                                                                       The local `InetAddress` to which the socket is connected (since version 4.2.5).                                                                                                       |
|      `ip_ackTo`       |         `ACKADDRESS`          |                                                                  The remote IP address to which UDP application-level acknowledgments are sent.<br/>The framework includes acknowledgment information in the data packet.                                                                   |
|      `ip_ackId`       |           `ACK_ID`            |                                                                            A correlation ID for UDP application-level acknowledgments.<br/>The framework includes acknowledgment information in the data packet.                                                                            |
|  `ip_tcp_remotePort`  |         `REMOTE_PORT`         |                                                                                                                            The remote port for a TCP connection.                                                                                                                            |
|   `ip_connectionId`   |        `CONNECTION_ID`        |A unique identifier for a TCP connection.<br/>Set by the framework for inbound messages.<br/>When sending to a server-side inbound channel adapter or replying to an inbound gateway, this header is required so that the endpoint can determine the connection to which to send the message.|
|`ip_actualConnectionId`|    `ACTUAL_CONNECTION_ID`     |                                                                            For information only.<br/>When using a cached or failover client connection factory, it contains the actual underlying connection ID.                                                                            |
|     `contentType`     |`MessageHeaders.``CONTENT_TYPE`|                                         An optional content type for inbound messages<br/>Described after this table.<br/>Note that, unlike the other header constants, this constant is in the `MessageHeaders` class, not the `IpHeaders` class.                                          |

For inbound messages, `ip_hostname`, `ip_address`, `ip_tcp_remotePort`, and `ip_connectionId` are mapped by the default `TcpHeaderMapper`.
If you set the mapper’s `addContentTypeHeader` property to `true`, the mapper sets the `contentType` header (`application/octet-stream;charset="UTF-8"`, by default).
You can change the default by setting the `contentType` property.
You can add additional headers by subclassing `TcpHeaderMapper` and overriding the `supplyCustomHeaders` method.
For example, when you use SSL, you can add properties of the `SSLSession` by obtaining the session object from the `TcpConnection` object, which is provided as an argument to the `supplyCustomHeaders` method.

For outbound messages, `String` payloads are converted to `byte[]` with the default (`UTF-8`) charset.
Set the `charset` property to change the default.

When customizing the mapper properties or subclassing, declare the mapper as a bean and provide an instance to the connection factory by using the `mapper` property.

1543
### Annotation-Based Configuration
茶陵後's avatar
茶陵後 已提交
1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627

The following example from the samples repository shows some of the configuration options available when you use annotations instead of XML:

```
@EnableIntegration (1)
@IntegrationComponentScan (2)
@Configuration
public static class Config {

    @Value(${some.port})
    private int port;

    @MessagingGateway(defaultRequestChannel="toTcp") (3)
    public interface Gateway {

        String viaTcp(String in);

    }

    @Bean
    @ServiceActivator(inputChannel="toTcp") (4)
    public MessageHandler tcpOutGate(AbstractClientConnectionFactory connectionFactory) {
        TcpOutboundGateway gate = new TcpOutboundGateway();
        gate.setConnectionFactory(connectionFactory);
        gate.setOutputChannelName("resultToString");
        return gate;
    }

    @Bean (5)
    public TcpInboundGateway tcpInGate(AbstractServerConnectionFactory connectionFactory)  {
        TcpInboundGateway inGate = new TcpInboundGateway();
        inGate.setConnectionFactory(connectionFactory);
        inGate.setRequestChannel(fromTcp());
        return inGate;
    }

    @Bean
    public MessageChannel fromTcp() {
        return new DirectChannel();
    }

    @MessageEndpoint
    public static class Echo { (6)

        @Transformer(inputChannel="fromTcp", outputChannel="toEcho")
        public String convert(byte[] bytes) {
            return new String(bytes);
        }

        @ServiceActivator(inputChannel="toEcho")
        public String upCase(String in) {
            return in.toUpperCase();
        }

        @Transformer(inputChannel="resultToString")
        public String convertResult(byte[] bytes) {
            return new String(bytes);
        }

    }

    @Bean
    public AbstractClientConnectionFactory clientCF() { (7)
        return new TcpNetClientConnectionFactory("localhost", this.port);
    }

    @Bean
    public AbstractServerConnectionFactory serverCF() { (8)
        return new TcpNetServerConnectionFactory(this.port);
    }

}
```

|**1**|                                         Standard Spring Integration annotation enabling the infrastructure for an integration application.                                         |
|-----|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|**2**|                                                                    Searches for `@MessagingGateway` interfaces.                                                                    |
|**3**|                  The entry point to the client-side of the flow.<br/>The calling application can use `@Autowired` for this `Gateway` bean and invoke its method.                   |
|**4**|Outbound endpoints consist of a `MessageHandler` and a consumer that wraps it.<br/>In this scenario, the `@ServiceActivator` configures the endpoint, according to the channel type.|
|**5**|                           Inbound endpoints (in the TCP/UDP module) are all message-driven and so only need to be declared as simple `@Bean` instances.                            |
|**6**|  This class provides a number of POJO methods for use in this sample flow (a `@Transformer` and `@ServiceActivator` on the server side and a `@Transformer` on the client side).   |
|**7**|                                                                        The client-side connection factory.                                                                         |
|**8**|                                                                        The server-side connection factory.                                                                         |

1628
### Using the Java DSL for TCP Components
茶陵後's avatar
茶陵後 已提交
1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687

DSL support for TCP components includes specs for adapters and gateways, the `Tcp` class with factory methods to create connection factory beans, and the `TcpCodecs` class with factory methods to create serializers and deserializers.
Refer to their javadocs for more information.

Here are some examples of using the DSL to configure flows using the DSL.

Example 1. Server Adapter Flow

```
@Bean
public IntegrationFlow server() {
    return IntegrationFlows.from(Tcp.inboundAdapter(Tcp.netServer(1234)
                            .deserializer(TcpCodecs.lengthHeader1())
                            .backlog(30))
                        .errorChannel("tcpIn.errorChannel")
                        .id("tcpIn"))
            .transform(Transformers.objectToString())
            .channel("tcpInbound")
            .get();
}
```

Example 2. Client Adapter Flow

```
@Bean
public IntegrationFlow client() {
    return f -> f.handle(Tcp.outboundAdapter(Tcp.nioClient("localhost", 1234)
                        .serializer(TcpCodecs.lengthHeader1())));
}
```

Example 3. Server Gateway Flow

```
@Bean
public IntegrationFlow server() {
    return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1234)
                            .deserializer(TcpCodecs.lengthHeader1())
                            .serializer(TcpCodecs.lengthHeader1())
                            .backlog(30))
                        .errorChannel("tcpIn.errorChannel")
                        .id("tcpIn"))
            .transform(Transformers.objectToString())
            .channel("tcpInbound")
            .get();
}
```

Example 4. Client Gateway Flow

```
@Bean
public IntegrationFlow client() {
    return f -> f.handle(Tcp.outboundGateway(Tcp.nioClient("localhost", 1234)
                        .deserializer(TcpCodecs.lengthHeader1())
                        .serializer(TcpCodecs.lengthHeader1())));
}
```