stream.md 6.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
# 流支持

## 流支持

在许多情况下,应用程序数据是从流获得的。不建议将对流的引用作为消息有效负载发送给消费者。相反,消息是从从输入流中读取的数据创建的,消息有效负载被一个接一个地写入输出流。

你需要在项目中包含此依赖项:

Maven

```
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    <version>5.5.9</version>
</dependency>
```

Gradle

```
compile "org.springframework.integration:spring-integration-stream:5.5.9"
```

### 从流中读取

Spring 集成为流提供了两个适配器。`ByteStreamReadingMessageSource``CharacterStreamReadingMessageSource`都实现`MessageSource`。通过在通道适配器元素中配置其中的一个,可以配置轮询周期,并且消息总线可以自动检测和调度它们。字节流版本需要`InputStream`,字符流版本需要`Reader`作为单个构造函数参数。`ByteStreamReadingMessageSource`还接受“bytesperMessage”属性,以确定它试图将多少字节读入每个`Message`。默认值是 1024。下面的示例创建了一个输入流,该输入流创建了每个包含 2048 个字节的消息:

```
<bean class="org.springframework.integration.stream.ByteStreamReadingMessageSource">
  <constructor-arg ref="someInputStream"/>
  <property name="bytesPerMessage" value="2048"/>
</bean>

<bean class="org.springframework.integration.stream.CharacterStreamReadingMessageSource">
  <constructor-arg ref="someReader"/>
</bean>
```

`CharacterStreamReadingMessageSource`将阅读器包装在`BufferedReader`中(如果它还不是一个)。你可以在第二个构造函数参数中设置缓冲读取器所使用的缓冲区大小。从版本 5.0 开始,第三个构造函数参数(`blockToDetectEOF`)控制`CharacterStreamReadingMessageSource`的行为。当`false`(默认值)时,`receive()`方法检查读取器是否为`ready()`,如果不是,则返回 null。在这种情况下,没有检测到 EOF(文件结束)。当`true`时,`receive()`方法将阻塞,直到数据可用或在底层流上检测到 EOF 为止。当检测到 EOF 时,将发布`StreamClosedEvent`(应用程序事件)。你可以使用实现`ApplicationListener<StreamClosedEvent>`的 Bean 来使用此事件。

|   |为了方便 EOF 检测,poller 线程在`receive()`方法中阻塞,直到数据到达或 EOF 检测到为止。|
|---|-----------------------------------------------------------------------------------------------------------------------------|

|   |投票者继续在每一次投票中公布一次事件。一旦检测到 EOF。<br/>应用程序侦听器可以停止适配器以防止这种情况发生。<br/>事件在 Poller 线程上发布。<br/>停止适配器会导致线程中断。<br/>如果你打算在停止适配器后执行某些可中断任务,你必须在不同的线程上执行`stop()`,或者为这些下游活动使用不同的线程。<br/>注意,向`QueueChannel`发送消息是可中断的,因此,如果你希望从侦听器发送消息,请在停止适配器之前执行此操作。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

这有利于“管道传输”或将数据重定向到`stdin`,如下两个示例所示:

```
cat myfile.txt | java -jar my.jar
```

```
java -jar my.jar < foo.txt
```

这种方法允许应用程序在管道关闭时停止。

有四种方便的工厂方法可供选择:

```
public static final CharacterStreamReadingMessageSource stdin() { ... }

public static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... }

public static final CharacterStreamReadingMessageSource stdinPipe() { ... }

public static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }
```

### 写到流

对于目标流,可以使用两种实现方式中的任意一种:`ByteStreamWritingMessageHandler``CharacterStreamWritingMessageHandler`。每个都需要一个构造函数参数(对于字节流,`OutputStream`;对于字符流,`Writer`),并且每个都提供了第二个构造函数,该构造函数添加了可选的“缓冲区大小”。因为这两个最终都实现了`MessageHandler`接口,所以你可以从`channel-adapter`配置中引用它们,如[通道适配器](./channel-adapter.html#channel-adapter)中所述。

```
<bean class="org.springframework.integration.stream.ByteStreamWritingMessageHandler">
  <constructor-arg ref="someOutputStream"/>
  <constructor-arg value="1024"/>
</bean>

<bean class="org.springframework.integration.stream.CharacterStreamWritingMessageHandler">
  <constructor-arg ref="someWriter"/>
</bean>
```

### 流名称空间支持

Spring 集成定义了一个名称空间,以减少与流相关的通道适配器所需的配置。要使用它,需要以下模式位置:

```
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:beans="http://www.springframework.org/schema/beans"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
      https://www.springframework.org/schema/beans/spring-beans.xsd
      http://www.springframework.org/schema/integration/stream
      https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
```

以下代码片段显示了配置入站通道适配器所支持的不同配置选项:

```
<int-stream:stdin-channel-adapter id="adapterWithDefaultCharset"/>

<int-stream:stdin-channel-adapter id="adapterWithProvidedCharset" charset="UTF-8"/>
```

从版本 5.0 开始,你可以设置`detect-eof`属性,该属性设置`blockToDetectEOF`属性。有关更多信息,请参见[从流中阅读](#stream-reading)

要配置出站通道适配器,还可以使用名称空间支持。下面的示例显示了出站通道适配器的不同配置:

```
<int-stream:stdout-channel-adapter id="stdoutAdapterWithDefaultCharset"
    channel="testChannel"/>

<int-stream:stdout-channel-adapter id="stdoutAdapterWithProvidedCharset" charset="UTF-8"
    channel="testChannel"/>

<int-stream:stderr-channel-adapter id="stderrAdapter" channel="testChannel"/>

<int-stream:stdout-channel-adapter id="newlineAdapter" append-newline="true"
    channel="testChannel"/>
```