scalability.md 16.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
# 缩放和并行处理

## 缩放和并行处理

XMLJavaBoth

许多批处理问题可以通过单线程、单流程作业来解决,因此在考虑更复杂的实现之前,正确地检查它是否满足你的需求始终是一个好主意。衡量一项实际工作的性能,看看最简单的实现是否首先满足你的需求。即使使用标准的硬件,你也可以在一分钟内读写几百兆的文件。

Spring 当你准备好开始用一些并行处理来实现一个作业时, Spring Batch 提供了一系列选项,这些选项在本章中进行了描述,尽管其他地方也介绍了一些特性。在高层次上,有两种并行处理模式:

* 单过程、多线程

* 多进程

这些指标也可分为以下几类:

* 多线程步骤(单进程)

* 并行步骤(单一过程)

* 步骤的远程分块(多进程)

* 划分一个步骤(单个或多个进程)

首先,我们回顾一下单流程选项。然后,我们回顾了多进程的选择。

### 多线程步骤

启动并行处理的最简单方法是在步骤配置中添加`TaskExecutor`

例如,你可以添加`tasklet`的一个属性,如下所示:

```
<step id="loading">
    <tasklet task-executor="taskExecutor">...</tasklet>
</step>
```

当使用 Java 配置时,可以将`TaskExecutor`添加到该步骤中,如以下示例所示:

Java 配置

```
@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
	return this.stepBuilderFactory.get("sampleStep")
				.<String, String>chunk(10)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.build();
}
```

在此示例中,`taskExecutor`是对另一个 Bean 定义的引用,该定义实现了`TaskExecutor`接口。[`TaskExecutor`](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/springframework/core/core/task/taskexecutor.html)是一个标准的 Spring 接口,因此请参阅 Spring 用户指南以获得可用实现的详细信息。最简单的多线程`TaskExecutor``SimpleAsyncTaskExecutor`

上述配置的结果是,`Step`通过在单独的执行线程中读取、处理和写入每个项块(每个提交间隔)来执行。请注意,这意味着要处理的项没有固定的顺序,并且块可能包含与单线程情况相比非连续的项。除了任务执行器设置的任何限制(例如它是否由线程池支持)之外,Tasklet 配置中还有一个油门限制,默认为 4。你可能需要增加这一点,以确保线程池得到充分利用。

例如,你可能会增加油门限制,如以下示例所示:

```
<step id="loading"> <tasklet
    task-executor="taskExecutor"
    throttle-limit="20">...</tasklet>
</step>
```

在使用 Java 配置时,构建器提供对油门限制的访问,如以下示例所示:

Java 配置

```
@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
	return this.stepBuilderFactory.get("sampleStep")
				.<String, String>chunk(10)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.throttleLimit(20)
				.build();
}
```

还请注意,在你的步骤中使用的任何池资源都可能对并发性施加限制,例如`DataSource`。确保这些资源中的池至少与步骤中所需的并发线程数量一样大。

对于一些常见的批处理用例,使用多线程`Step`实现有一些实际的限制。`Step`中的许多参与者(例如读者和作者)是有状态的。如果状态不是由线程隔离的,那么这些组件在多线程`Step`中是不可用的。特别是, Spring 批中的大多数现成的读取器和编写器都不是为多线程使用而设计的。然而,可以使用无状态的或线程安全的读取器和编写器,并且在[Spring Batch Samples](https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples)中有一个示例(称为`parallelJob`),该示例显示了使用过程指示器(参见[防止状态持久性](readersAndWriters.html#process-indicator))来跟踪在数据库输入表中已处理的项。

Spring 批处理提供了`ItemWriter``ItemReader`的一些实现方式。通常,他们会在 Javadoc 中说明它们是否是线程安全的,或者你必须做什么来避免在并发环境中出现问题。如果 Javadoc 中没有信息,则可以检查实现,以查看是否存在任何状态。如果阅读器不是线程安全的,那么你可以使用提供的`SynchronizedItemStreamReader`来装饰它,或者在你自己的同步委托程序中使用它。你可以将调用同步到`read()`,并且只要处理和写入是块中最昂贵的部分,你的步骤仍然可以比在单线程配置中快得多地完成。

### 平行步骤

只要需要并行化的应用程序逻辑可以划分为不同的职责,并分配给各个步骤,那么就可以在单个流程中进行并行化。并行步骤执行很容易配置和使用。

例如,与`step3`并行执行`(step1,step2)`的步骤是直接的,如以下示例所示:

```
<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>
```

当使用 Java 配置时,与`(step1,step2)`并行执行步骤`step3`是很简单的,如以下示例所示:

Java 配置

```
@Bean
public Job job() {
    return jobBuilderFactory.get("job")
        .start(splitFlow())
        .next(step4())
        .build()        //builds FlowJobBuilder instance
        .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
    return new FlowBuilder<SimpleFlow>("splitFlow")
        .split(taskExecutor())
        .add(flow1(), flow2())
        .build();
}

@Bean
public Flow flow1() {
    return new FlowBuilder<SimpleFlow>("flow1")
        .start(step1())
        .next(step2())
        .build();
}

@Bean
public Flow flow2() {
    return new FlowBuilder<SimpleFlow>("flow2")
        .start(step3())
        .build();
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}
```

可配置任务执行器用于指定应该使用哪个`TaskExecutor`实现来执行各个流。默认值是`SyncTaskExecutor`,但是需要一个异步`TaskExecutor`来并行运行这些步骤。请注意,该作业确保在聚合退出状态和转换之前,拆分中的每个流都已完成。

有关更多详细信息,请参见[拆分流](step.html#split-flows)一节。

### 远程分块

在远程分块中,`Step`处理被分割到多个进程中,通过一些中间件相互通信。下图显示了该模式:

![远程分块](https://docs.spring.io/spring-batch/docs/current/reference/html/images/remote-chunking.png)

图 1。远程分块

Manager 组件是一个单独的进程,工作人员是多个远程进程。如果 Manager 不是瓶颈,那么这种模式最有效,因此处理必须比读取项目更昂贵(在实践中通常是这种情况)。

Manager 是 Spring 批处理`Step`的实现,其中`ItemWriter`被一个通用版本代替,该版本知道如何将项目块作为消息发送到中间件。工人是正在使用的任何中间件的标准侦听器(例如,对于 JMS,他们将是`MessageListener`实现),他们的角色是通过`ItemWriter``ItemProcessor`加上`ItemWriter`接口使用标准的项块。使用这种模式的优点之一是读写器、处理器和写写器组件是现成的(与用于步骤的本地执行的组件相同)。这些项是动态划分的,工作是通过中间件共享的,因此,如果侦听器都是热心的消费者,那么负载平衡就是自动的。

中间件必须是持久的,保证交付,并且每条消息只有一个使用者。JMS 是显而易见的候选者,但在网格计算和共享内存产品空间中存在其他选项(例如 JavaSpace)。

有关更多详细信息,请参见[Spring Batch Integration - Remote Chunking](spring-batch-integration.html#remote-chunking)一节。

### 分区

Spring 批处理还提供了用于分区`Step`执行并远程执行它的 SPI。在这种情况下,远程参与者是`Step`实例,这些实例可以很容易地被配置并用于本地处理。下图显示了该模式:

![分区概述](https://docs.spring.io/spring-batch/docs/current/reference/html/images/partitioning-overview.png)

图 2。划分

`Job`作为`Step`实例的序列在左侧运行,其中一个`Step`实例被标记为管理器。这张图中的工人都是`Step`的相同实例,它实际上可以代替经理,从而导致`Job`的结果相同。工作人员通常是远程服务,但也可能是执行的本地线程。在此模式中,经理发送给工作人员的消息不需要是持久的,也不需要有保证的交付。 Spring `JobRepository`中的批处理元数据确保每个工作者执行一次,并且对于每个`Job`执行只执行一次。

Spring 批处理中的 SPI 由`Step`(称为`PartitionStep`)的特殊实现和需要为特定环境实现的两个策略接口组成。策略接口是`PartitionHandler``StepExecutionSplitter`,它们的作用在下面的序列图中显示:

![分区 SPI](https://docs.spring.io/spring-batch/docs/current/reference/html/images/partitioning-spi.png)

图 3。分区 SPI

在这种情况下,右边的`Step`是“远程”工作者,因此,潜在地,有许多对象和或进程在扮演这个角色,并且`PartitionStep`被显示为驱动执行。

下面的示例显示了使用 XML 配置时的`PartitionStep`配置:

```
<step id="step1.manager">
    <partition step="step1" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</step>
```

下面的示例显示了使用 Java 配置时的`PartitionStep`配置:

Java 配置

```
@Bean
public Step step1Manager() {
    return stepBuilderFactory.get("step1.manager")
        .<String, String>partitioner("step1", partitioner())
        .step(step1())
        .gridSize(10)
        .taskExecutor(taskExecutor())
        .build();
}
```

与多线程步骤的`throttle-limit`属性类似,`grid-size`属性防止任务执行器被来自单个步骤的请求饱和。

有一个简单的示例,可以在[Spring Batch Samples](https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples/src/main/resources/jobs)的单元测试套件中进行复制和扩展(参见`partition*Job.xml`配置)。

Spring 批处理为被称为“Step1:Partition0”的分区创建步骤执行,以此类推。为了保持一致性,许多人更喜欢将 Manager 步骤称为“Step1:Manager”。你可以为步骤使用别名(通过指定`name`属性而不是`id`属性)。

#### 分区处理程序

`PartitionHandler`是了解远程或网格环境结构的组件。它能够将`StepExecution`请求发送到远程`Step`实例,并以某种特定于织物的格式包装,例如 DTO。它不需要知道如何分割输入数据或如何聚合多个`Step`执行的结果。一般来说,它可能也不需要了解弹性或故障转移,因为在许多情况下,这些都是织物的功能。在任何情况下, Spring 批处理总是提供独立于织物的重启性。失败的`Job`总是可以重新启动,并且只重新执行失败的`Steps`

`PartitionHandler`接口可以为各种结构类型提供专门的实现,包括简单的 RMI 远程处理、EJB 远程处理、自定义 Web 服务、JMS、Java 空间、共享内存网格(如 Terracotta 或 Coherence)和网格执行结构(如 GridGain)。 Spring 批处理不包含用于任何专有网格或远程织物的实现方式。

Spring 然而,批处理确实提供了`PartitionHandler`的一种有用的实现,该实现使用 Spring 中的`TaskExecutor`策略,在单独的执行线程中本地执行`Step`实例。该实现被称为`TaskExecutorPartitionHandler`

`TaskExecutorPartitionHandler`是使用前面显示的 XML 名称空间进行配置的步骤的默认值。也可以显式地对其进行配置,如以下示例所示:

```
<step id="step1.manager">
    <partition step="step1" handler="handler"/>
</step>

<bean class="org.spr...TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="taskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>
```

`TaskExecutorPartitionHandler`可以在 Java 配置中显式地进行配置,如以下示例所示:

Java 配置

```
@Bean
public Step step1Manager() {
    return stepBuilderFactory.get("step1.manager")
        .partitioner("step1", partitioner())
        .partitionHandler(partitionHandler())
        .build();
}

@Bean
public PartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
    retVal.setTaskExecutor(taskExecutor());
    retVal.setStep(step1());
    retVal.setGridSize(10);
    return retVal;
}
```

`gridSize`属性决定要创建的独立步骤执行的数量,因此它可以与`TaskExecutor`中线程池的大小匹配。或者,可以将其设置为比可用的线程数量更大,这使得工作块更小。

`TaskExecutorPartitionHandler`对于 IO 密集型`Step`实例很有用,例如复制大量文件或将文件系统复制到内容管理系统中。它还可以通过提供`Step`实现来用于远程执行,该实现是远程调用的代理(例如使用 Spring remoting)。

#### 分割者

`Partitioner`有一个更简单的职责:仅为新的步骤执行生成执行上下文作为输入参数(无需担心重新启动)。它只有一个方法,如下面的接口定义所示:

```
public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}
```

这个方法的返回值将每个步骤执行的唯一名称(`String`)与输入参数(`ExecutionContext`)以`ExecutionContext`的形式关联起来。这些名称稍后会在批处理元数据中显示为分区`StepExecutions`中的步骤名称。`ExecutionContext`只是一组名称-值对,因此它可能包含一系列主键、行号或输入文件的位置。然后,远程`Step`通常使用`#{…​}`占位符(在步骤作用域中的后期绑定)绑定到上下文输入,如下一节所示。

步骤执行的名称(由`Partitioner`返回的`Map`中的键)需要在`Job`的步骤执行中是唯一的,但没有任何其他特定的要求。要做到这一点(并使名称对用户有意义),最简单的方法是使用前缀 + 后缀命名约定,其中前缀是正在执行的步骤的名称(它本身在`Job`中是唯一的),后缀只是一个计数器。在使用该约定的框架中有一个`SimplePartitioner`

可以使用一个名为`PartitionNameProvider`的可选接口来提供与分区本身分开的分区名称。如果`Partitioner`实现了这个接口,那么在重新启动时,只会查询名称。如果分区是昂贵的,这可以是一个有用的优化。由`PartitionNameProvider`提供的名称必须与`Partitioner`提供的名称匹配。

#### 将输入数据绑定到步骤

`PartitionHandler`执行的步骤具有相同的配置,并且它们的输入参数在运行时从`ExecutionContext`绑定,这是非常有效的。 Spring 批处理的 StepScope 特性很容易做到这一点(在[后期绑定](step.html#late-binding)一节中更详细地介绍)。例如,如果`Partitioner`使用一个名为`fileName`的属性键创建`ExecutionContext`实例,并针对每个步骤调用指向不同的文件(或目录),则`Partitioner`输出可能类似于下表的内容:

|*步骤执行名称(键)*|*ExecutionContext (value)*|
|---------------------------|--------------------------|
|filecopy:分区 0| fileName=/home/data/one  |
|filecopy:partition1| fileName=/home/data/two  |
|filecopy:partition2|fileName=/home/data/three |

然后,可以使用与执行上下文的后期绑定将文件名绑定到一个步骤。

下面的示例展示了如何在 XML 中定义后期绑定:

XML 配置

```
<bean id="itemReader" scope="step"
      class="org.spr...MultiResourceItemReader">
    <property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>
```

下面的示例展示了如何在 Java 中定义后期绑定:

Java 配置

```
@Bean
public MultiResourceItemReader itemReader(
	@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
	return new MultiResourceItemReaderBuilder<String>()
			.delegate(fileReader())
			.name("itemReader")
			.resources(resources)
			.build();
}
```