# 缩放和并行处理 ## [](#scalability)缩放和并行处理 XMLJavaBoth 许多批处理问题可以通过单线程、单流程作业来解决,因此在考虑更复杂的实现之前,正确地检查它是否满足你的需求始终是一个好主意。衡量一项实际工作的性能,看看最简单的实现是否首先满足你的需求。即使使用标准的硬件,你也可以在一分钟内读写几百兆的文件。 Spring 当你准备好开始用一些并行处理来实现一个作业时, Spring Batch 提供了一系列选项,这些选项在本章中进行了描述,尽管其他地方也介绍了一些特性。在高层次上,有两种并行处理模式: * 单过程、多线程 * 多进程 这些指标也可分为以下几类: * 多线程步骤(单进程) * 并行步骤(单一过程) * 步骤的远程分块(多进程) * 划分一个步骤(单个或多个进程) 首先,我们回顾一下单流程选项。然后,我们回顾了多进程的选择。 ### [](#multithreadedStep)多线程步骤 启动并行处理的最简单方法是在步骤配置中添加`TaskExecutor`。 例如,你可以添加`tasklet`的一个属性,如下所示: ``` ... ``` 当使用 Java 配置时,可以将`TaskExecutor`添加到该步骤中,如以下示例所示: Java 配置 ``` @Bean public TaskExecutor taskExecutor() { return new SimpleAsyncTaskExecutor("spring_batch"); } @Bean public Step sampleStep(TaskExecutor taskExecutor) { return this.stepBuilderFactory.get("sampleStep") .chunk(10) .reader(itemReader()) .writer(itemWriter()) .taskExecutor(taskExecutor) .build(); } ``` 在此示例中,`taskExecutor`是对另一个 Bean 定义的引用,该定义实现了`TaskExecutor`接口。[`TaskExecutor`](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/springframework/core/core/task/taskexecutor.html)是一个标准的 Spring 接口,因此请参阅 Spring 用户指南以获得可用实现的详细信息。最简单的多线程`TaskExecutor`是`SimpleAsyncTaskExecutor`。 上述配置的结果是,`Step`通过在单独的执行线程中读取、处理和写入每个项块(每个提交间隔)来执行。请注意,这意味着要处理的项没有固定的顺序,并且块可能包含与单线程情况相比非连续的项。除了任务执行器设置的任何限制(例如它是否由线程池支持)之外,Tasklet 配置中还有一个油门限制,默认为 4。你可能需要增加这一点,以确保线程池得到充分利用。 例如,你可能会增加油门限制,如以下示例所示: ``` ... ``` 在使用 Java 配置时,构建器提供对油门限制的访问,如以下示例所示: Java 配置 ``` @Bean public Step sampleStep(TaskExecutor taskExecutor) { return this.stepBuilderFactory.get("sampleStep") .chunk(10) .reader(itemReader()) .writer(itemWriter()) .taskExecutor(taskExecutor) .throttleLimit(20) .build(); } ``` 还请注意,在你的步骤中使用的任何池资源都可能对并发性施加限制,例如`DataSource`。确保这些资源中的池至少与步骤中所需的并发线程数量一样大。 对于一些常见的批处理用例,使用多线程`Step`实现有一些实际的限制。`Step`中的许多参与者(例如读者和作者)是有状态的。如果状态不是由线程隔离的,那么这些组件在多线程`Step`中是不可用的。特别是, Spring 批中的大多数现成的读取器和编写器都不是为多线程使用而设计的。然而,可以使用无状态的或线程安全的读取器和编写器,并且在[Spring Batch Samples](https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples)中有一个示例(称为`parallelJob`),该示例显示了使用过程指示器(参见[防止状态持久性](readersAndWriters.html#process-indicator))来跟踪在数据库输入表中已处理的项。 Spring 批处理提供了`ItemWriter`和`ItemReader`的一些实现方式。通常,他们会在 Javadoc 中说明它们是否是线程安全的,或者你必须做什么来避免在并发环境中出现问题。如果 Javadoc 中没有信息,则可以检查实现,以查看是否存在任何状态。如果阅读器不是线程安全的,那么你可以使用提供的`SynchronizedItemStreamReader`来装饰它,或者在你自己的同步委托程序中使用它。你可以将调用同步到`read()`,并且只要处理和写入是块中最昂贵的部分,你的步骤仍然可以比在单线程配置中快得多地完成。 ### [](#scalabilityParallelSteps)平行步骤 只要需要并行化的应用程序逻辑可以划分为不同的职责,并分配给各个步骤,那么就可以在单个流程中进行并行化。并行步骤执行很容易配置和使用。 例如,与`step3`并行执行`(step1,step2)`的步骤是直接的,如以下示例所示: ``` ``` 当使用 Java 配置时,与`(step1,step2)`并行执行步骤`step3`是很简单的,如以下示例所示: Java 配置 ``` @Bean public Job job() { return jobBuilderFactory.get("job") .start(splitFlow()) .next(step4()) .build() //builds FlowJobBuilder instance .build(); //builds Job instance } @Bean public Flow splitFlow() { return new FlowBuilder("splitFlow") .split(taskExecutor()) .add(flow1(), flow2()) .build(); } @Bean public Flow flow1() { return new FlowBuilder("flow1") .start(step1()) .next(step2()) .build(); } @Bean public Flow flow2() { return new FlowBuilder("flow2") .start(step3()) .build(); } @Bean public TaskExecutor taskExecutor() { return new SimpleAsyncTaskExecutor("spring_batch"); } ``` 可配置任务执行器用于指定应该使用哪个`TaskExecutor`实现来执行各个流。默认值是`SyncTaskExecutor`,但是需要一个异步`TaskExecutor`来并行运行这些步骤。请注意,该作业确保在聚合退出状态和转换之前,拆分中的每个流都已完成。 有关更多详细信息,请参见[拆分流](step.html#split-flows)一节。 ### [](#remoteChunking)远程分块 在远程分块中,`Step`处理被分割到多个进程中,通过一些中间件相互通信。下图显示了该模式: ![远程分块](./images/remote-chunking.png) 图 1。远程分块 Manager 组件是一个单独的进程,工作人员是多个远程进程。如果 Manager 不是瓶颈,那么这种模式最有效,因此处理必须比读取项目更昂贵(在实践中通常是这种情况)。 Manager 是 Spring 批处理`Step`的实现,其中`ItemWriter`被一个通用版本代替,该版本知道如何将项目块作为消息发送到中间件。工人是正在使用的任何中间件的标准侦听器(例如,对于 JMS,他们将是`MessageListener`实现),他们的角色是通过`ItemWriter`或`ItemProcessor`加上`ItemWriter`接口使用标准的项块。使用这种模式的优点之一是读写器、处理器和写写器组件是现成的(与用于步骤的本地执行的组件相同)。这些项是动态划分的,工作是通过中间件共享的,因此,如果侦听器都是热心的消费者,那么负载平衡就是自动的。 中间件必须是持久的,保证交付,并且每条消息只有一个使用者。JMS 是显而易见的候选者,但在网格计算和共享内存产品空间中存在其他选项(例如 JavaSpace)。 有关更多详细信息,请参见[Spring Batch Integration - Remote Chunking](spring-batch-integration.html#remote-chunking)一节。 ### [](#partitioning)分区 Spring 批处理还提供了用于分区`Step`执行并远程执行它的 SPI。在这种情况下,远程参与者是`Step`实例,这些实例可以很容易地被配置并用于本地处理。下图显示了该模式: ![分区概述](./images/partitioning-overview.png) 图 2。划分 `Job`作为`Step`实例的序列在左侧运行,其中一个`Step`实例被标记为管理器。这张图中的工人都是`Step`的相同实例,它实际上可以代替经理,从而导致`Job`的结果相同。工作人员通常是远程服务,但也可能是执行的本地线程。在此模式中,经理发送给工作人员的消息不需要是持久的,也不需要有保证的交付。 Spring `JobRepository`中的批处理元数据确保每个工作者执行一次,并且对于每个`Job`执行只执行一次。 Spring 批处理中的 SPI 由`Step`(称为`PartitionStep`)的特殊实现和需要为特定环境实现的两个策略接口组成。策略接口是`PartitionHandler`和`StepExecutionSplitter`,它们的作用在下面的序列图中显示: ![分区 SPI](./images/partitioning-spi.png) 图 3。分区 SPI 在这种情况下,右边的`Step`是“远程”工作者,因此,潜在地,有许多对象和或进程在扮演这个角色,并且`PartitionStep`被显示为驱动执行。 下面的示例显示了使用 XML 配置时的`PartitionStep`配置: ``` ``` 下面的示例显示了使用 Java 配置时的`PartitionStep`配置: Java 配置 ``` @Bean public Step step1Manager() { return stepBuilderFactory.get("step1.manager") .partitioner("step1", partitioner()) .step(step1()) .gridSize(10) .taskExecutor(taskExecutor()) .build(); } ``` 与多线程步骤的`throttle-limit`属性类似,`grid-size`属性防止任务执行器被来自单个步骤的请求饱和。 有一个简单的示例,可以在[Spring Batch Samples](https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples/src/main/resources/jobs)的单元测试套件中进行复制和扩展(参见`partition*Job.xml`配置)。 Spring 批处理为被称为“Step1:Partition0”的分区创建步骤执行,以此类推。为了保持一致性,许多人更喜欢将 Manager 步骤称为“Step1:Manager”。你可以为步骤使用别名(通过指定`name`属性而不是`id`属性)。 #### [](#partitionHandler)分区处理程序 `PartitionHandler`是了解远程或网格环境结构的组件。它能够将`StepExecution`请求发送到远程`Step`实例,并以某种特定于织物的格式包装,例如 DTO。它不需要知道如何分割输入数据或如何聚合多个`Step`执行的结果。一般来说,它可能也不需要了解弹性或故障转移,因为在许多情况下,这些都是织物的功能。在任何情况下, Spring 批处理总是提供独立于织物的重启性。失败的`Job`总是可以重新启动,并且只重新执行失败的`Steps`。 `PartitionHandler`接口可以为各种结构类型提供专门的实现,包括简单的 RMI 远程处理、EJB 远程处理、自定义 Web 服务、JMS、Java 空间、共享内存网格(如 Terracotta 或 Coherence)和网格执行结构(如 GridGain)。 Spring 批处理不包含用于任何专有网格或远程织物的实现方式。 Spring 然而,批处理确实提供了`PartitionHandler`的一种有用的实现,该实现使用 Spring 中的`TaskExecutor`策略,在单独的执行线程中本地执行`Step`实例。该实现被称为`TaskExecutorPartitionHandler`。 `TaskExecutorPartitionHandler`是使用前面显示的 XML 名称空间进行配置的步骤的默认值。也可以显式地对其进行配置,如以下示例所示: ``` ``` `TaskExecutorPartitionHandler`可以在 Java 配置中显式地进行配置,如以下示例所示: Java 配置 ``` @Bean public Step step1Manager() { return stepBuilderFactory.get("step1.manager") .partitioner("step1", partitioner()) .partitionHandler(partitionHandler()) .build(); } @Bean public PartitionHandler partitionHandler() { TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler(); retVal.setTaskExecutor(taskExecutor()); retVal.setStep(step1()); retVal.setGridSize(10); return retVal; } ``` `gridSize`属性决定要创建的独立步骤执行的数量,因此它可以与`TaskExecutor`中线程池的大小匹配。或者,可以将其设置为比可用的线程数量更大,这使得工作块更小。 `TaskExecutorPartitionHandler`对于 IO 密集型`Step`实例很有用,例如复制大量文件或将文件系统复制到内容管理系统中。它还可以通过提供`Step`实现来用于远程执行,该实现是远程调用的代理(例如使用 Spring remoting)。 #### [](#partitioner)分割者 `Partitioner`有一个更简单的职责:仅为新的步骤执行生成执行上下文作为输入参数(无需担心重新启动)。它只有一个方法,如下面的接口定义所示: ``` public interface Partitioner { Map partition(int gridSize); } ``` 这个方法的返回值将每个步骤执行的唯一名称(`String`)与输入参数(`ExecutionContext`)以`ExecutionContext`的形式关联起来。这些名称稍后会在批处理元数据中显示为分区`StepExecutions`中的步骤名称。`ExecutionContext`只是一组名称-值对,因此它可能包含一系列主键、行号或输入文件的位置。然后,远程`Step`通常使用`#{…​}`占位符(在步骤作用域中的后期绑定)绑定到上下文输入,如下一节所示。 步骤执行的名称(由`Partitioner`返回的`Map`中的键)需要在`Job`的步骤执行中是唯一的,但没有任何其他特定的要求。要做到这一点(并使名称对用户有意义),最简单的方法是使用前缀 + 后缀命名约定,其中前缀是正在执行的步骤的名称(它本身在`Job`中是唯一的),后缀只是一个计数器。在使用该约定的框架中有一个`SimplePartitioner`。 可以使用一个名为`PartitionNameProvider`的可选接口来提供与分区本身分开的分区名称。如果`Partitioner`实现了这个接口,那么在重新启动时,只会查询名称。如果分区是昂贵的,这可以是一个有用的优化。由`PartitionNameProvider`提供的名称必须与`Partitioner`提供的名称匹配。 #### [](#bindingInputDataToSteps)将输入数据绑定到步骤 由`PartitionHandler`执行的步骤具有相同的配置,并且它们的输入参数在运行时从`ExecutionContext`绑定,这是非常有效的。 Spring 批处理的 StepScope 特性很容易做到这一点(在[后期绑定](step.html#late-binding)一节中更详细地介绍)。例如,如果`Partitioner`使用一个名为`fileName`的属性键创建`ExecutionContext`实例,并针对每个步骤调用指向不同的文件(或目录),则`Partitioner`输出可能类似于下表的内容: |*步骤执行名称(键)*|*ExecutionContext (value)*| |---------------------------|--------------------------| |filecopy:分区 0| fileName=/home/data/one | |filecopy:partition1| fileName=/home/data/two | |filecopy:partition2|fileName=/home/data/three | 然后,可以使用与执行上下文的后期绑定将文件名绑定到一个步骤。 下面的示例展示了如何在 XML 中定义后期绑定: XML 配置 ``` ``` 下面的示例展示了如何在 Java 中定义后期绑定: Java 配置 ``` @Bean public MultiResourceItemReader itemReader( @Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) { return new MultiResourceItemReaderBuilder() .delegate(fileReader()) .name("itemReader") .resources(resources) .build(); } ```