stream.md 7.2 KB
Newer Older
茶陵後's avatar
茶陵後 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
# Stream Support 

## Stream Support

In many cases, application data is obtained from a stream.
It is not recommended to send a reference to a stream as a message payload to a consumer.
Instead, messages are created from data that is read from an input stream, and message payloads are written to an output stream one by one.

You need to include this dependency into your project:

Maven

```
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    <version>5.5.9</version>
</dependency>
```

Gradle

```
compile "org.springframework.integration:spring-integration-stream:5.5.9"
```

### Reading from Streams

Spring Integration provides two adapters for streams.
Both `ByteStreamReadingMessageSource` and `CharacterStreamReadingMessageSource` implement `MessageSource`.
By configuring one of these within a channel-adapter element, the polling period can be configured and the message bus can automatically detect and schedule them.
The byte stream version requires an `InputStream`, and the character stream version requires a `Reader` as the single constructor argument.
The `ByteStreamReadingMessageSource` also accepts the 'bytesPerMessage' property to determine how many bytes it tries to read into each `Message`.
The default value is 1024.
The following example creates an input stream that creates messages that each contain 2048 bytes:

```
<bean class="org.springframework.integration.stream.ByteStreamReadingMessageSource">
  <constructor-arg ref="someInputStream"/>
  <property name="bytesPerMessage" value="2048"/>
</bean>

<bean class="org.springframework.integration.stream.CharacterStreamReadingMessageSource">
  <constructor-arg ref="someReader"/>
</bean>
```

The `CharacterStreamReadingMessageSource` wraps the reader in a `BufferedReader` (if it is not one already).
You can set the buffer size used by the buffered reader in the second constructor argument.
Starting with version 5.0, a third constructor argument (`blockToDetectEOF`) controls the behavior of the `CharacterStreamReadingMessageSource`.
When `false` (the default), the `receive()` method checks whether the reader is `ready()` and returns null if not.
EOF (end of file) is not detected in this case.
When `true`, the `receive()` method blocks until data is available or EOF is detected on the underlying stream.
When EOF is detected, a `StreamClosedEvent` (application event) is published.
You can consume this event with a bean that implements `ApplicationListener<StreamClosedEvent>`.

|   |To facilitate EOF detection, the poller thread blocks in the `receive()` method until either data arrives or EOF is detected.|
|---|-----------------------------------------------------------------------------------------------------------------------------|

|   |The poller continues to publish an event on each poll once EOF has been detected.<br/>The application listener can stop the adapter to prevent this.<br/>The event is published on the poller thread.<br/>Stopping the adapter causes the thread to be interrupted.<br/>If you intend to perform some interruptible task after stopping the adapter, you must either perform the `stop()` on a different thread or use a different thread for those downstream activities.<br/>Note that sending to a `QueueChannel` is interruptible, so, if you wish to send a message from the listener, do it before stopping the adapter.|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

This facilitates “piping” or redirecting data to `stdin`, as the following two examples shows:

```
cat myfile.txt | java -jar my.jar
```

```
java -jar my.jar < foo.txt
```

This approach lets the application stop when the pipe is closed.

Four convenient factory methods are available:

```
public static final CharacterStreamReadingMessageSource stdin() { ... }

public static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... }

public static final CharacterStreamReadingMessageSource stdinPipe() { ... }

public static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }
```

### Writing to Streams

For target streams, you can use either of two implementations: `ByteStreamWritingMessageHandler` or `CharacterStreamWritingMessageHandler`.
Each requires a single constructor argument (`OutputStream` for byte streams or `Writer` for character streams), and each provides a second constructor that adds the optional 'bufferSize'.
Since both of these ultimately implement the `MessageHandler` interface, you can reference them from a `channel-adapter` configuration, as described in [Channel Adapter](./channel-adapter.html#channel-adapter).

```
<bean class="org.springframework.integration.stream.ByteStreamWritingMessageHandler">
  <constructor-arg ref="someOutputStream"/>
  <constructor-arg value="1024"/>
</bean>

<bean class="org.springframework.integration.stream.CharacterStreamWritingMessageHandler">
  <constructor-arg ref="someWriter"/>
</bean>
```

### Stream Namespace Support

Spring Integration defines a namespace to reduce the configuration needed for stream-related channel adapters.
The following schema locations are needed to use it:

```
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:beans="http://www.springframework.org/schema/beans"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
      https://www.springframework.org/schema/beans/spring-beans.xsd
      http://www.springframework.org/schema/integration/stream
      https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
```

The following code snippet shows the different configuration options that are supported to configure the inbound channel adapter:

```
<int-stream:stdin-channel-adapter id="adapterWithDefaultCharset"/>

<int-stream:stdin-channel-adapter id="adapterWithProvidedCharset" charset="UTF-8"/>
```

Starting with version 5.0, you can set the `detect-eof` attribute, which sets the `blockToDetectEOF` property.
See [Reading from Streams](#stream-reading) for more information.

To configure the outbound channel adapter, you can use the namespace support as well.
The following example shows the different configuration for an outbound channel adapters:

```
<int-stream:stdout-channel-adapter id="stdoutAdapterWithDefaultCharset"
    channel="testChannel"/>

<int-stream:stdout-channel-adapter id="stdoutAdapterWithProvidedCharset" charset="UTF-8"
    channel="testChannel"/>

<int-stream:stderr-channel-adapter id="stderrAdapter" channel="testChannel"/>

<int-stream:stdout-channel-adapter id="newlineAdapter" append-newline="true"
    channel="testChannel"/>
```