# 批处理的领域语言 ## [](#domainLanguageOfBatch)批处理的域语言 XMLJavaBoth 对于任何有经验的批处理架构师来说, Spring 批处理中使用的批处理的总体概念应该是熟悉和舒适的。有“作业”和“步骤”以及开发人员提供的处理单元,分别称为`ItemReader`和`ItemWriter`。然而,由于 Spring 模式、操作、模板、回调和习惯用法,有以下机会: * 在坚持明确区分关注事项方面有了显著改善。 * 清晰地描述了作为接口提供的体系结构层和服务。 * 简单和默认的实现,允许快速采用和易于使用的开箱即用。 * 显著增强了可扩展性。 下图是使用了几十年的批处理引用体系结构的简化版本。它提供了组成批处理领域语言的组件的概述。这个架构框架是一个蓝图,已经通过过去几代平台(COBOL/大型机、C/UNIX 和现在的 Java/Anywhere)上几十年的实现得到了证明。JCL 和 COBOL 开发人员可能与 C、C# 和 Java 开发人员一样熟悉这些概念。 Spring 批处理提供了通常在健壮的、可维护的系统中发现的层、组件和技术服务的物理实现,这些系统被用于解决创建简单到复杂的批处理应用程序,具有用于解决非常复杂的处理需求的基础设施和扩展。 ![图 2.1:批处理原型](./images/spring-batch-reference-model.png) 图 1。批处理模式 前面的图表突出了构成 Spring 批处理的域语言的关键概念。一个作业有一个到多个步骤,每个步骤正好有一个`ItemReader`,一个`ItemProcessor`和一个`ItemWriter`。需要启动一个作业(使用`JobLauncher`),并且需要存储有关当前运行的进程的元数据(在`JobRepository`中)。 ### [](#job)工作 这一部分描述了与批处理作业的概念有关的刻板印象。`Job`是封装整个批处理过程的实体。与其他 Spring 项目一样,`Job`与 XML 配置文件或基于 Java 的配置连接在一起。这种配置可以称为“作业配置”。然而,`Job`只是整个层次结构的顶部,如下图所示: ![工作层次结构](./images/job-heirarchy.png) 图 2。工作层次结构 在 Spring 批处理中,`Job`只是用于`Step`实例的容器。它将逻辑上属于一个流的多个步骤组合在一起,并允许将属性的全局配置用于所有步骤,例如可重启性。作业配置包含: * 工作的简单名称。 * `Step`实例的定义和排序。 * 这份工作是否可以重新启动。 对于那些使用 Java 配置的人, Spring Batch 以`SimpleJob`类的形式提供了作业接口的默认实现,它在`Job`之上创建了一些标准功能。当使用基于 Java 的配置时,可以使用一个构建器集合来实例化`Job`,如以下示例所示: ``` @Bean public Job footballJob() { return this.jobBuilderFactory.get("footballJob") .start(playerLoad()) .next(gameLoad()) .next(playerSummarization()) .build(); } ``` 对于那些使用 XML 配置的人, Spring Batch 以`SimpleJob`类的形式提供了`Job`接口的默认实现,它在`Job`之上创建了一些标准功能。然而,批处理名称空间抽象出了直接实例化它的需要。相反,可以使用``元素,如以下示例所示: ``` ``` #### [](#jobinstance)JobInstance a`JobInstance`指的是逻辑作业运行的概念。考虑应该在一天结束时运行一次的批处理作业,例如前面图表中的“endofday”`Job`。有一个“endofday”作业,但是`Job`的每个单独运行都必须单独跟踪。在这种情况下,每天有一个逻辑`JobInstance`。例如,有一个 1 月 1 日运行,1 月 2 日运行,以此类推。如果 1 月 1 日运行第一次失败,并在第二天再次运行,它仍然是 1 月 1 日运行。(通常,这也对应于它正在处理的数据,这意味着 1 月 1 日运行处理 1 月 1 日的数据)。因此,每个`JobInstance`都可以有多个执行(`JobExecution`在本章后面更详细地讨论),并且只有一个`JobInstance`对应于特定的`Job`和标识`JobParameters`的执行可以在给定的时间运行。 `JobInstance`的定义与要加载的数据完全无关。这完全取决于`ItemReader`实现来确定如何加载数据。例如,在 Endofday 场景中,数据上可能有一个列,该列指示数据所属的“生效日期”或“计划日期”。因此,1 月 1 日的运行将只加载 1 日的数据,而 1 月 2 日的运行将只使用 2 日的数据。因为这个决定很可能是一个商业决定,所以它是由`ItemReader`来决定的。然而,使用相同的`JobInstance`确定是否使用来自先前执行的’状态’(即`ExecutionContext`,这将在本章后面讨论)。使用一个新的`JobInstance`表示“从开始”,而使用一个现有的实例通常表示“从你停止的地方开始”。 #### [](#jobparameters)JobParameters 在讨论了`JobInstance`以及它与约伯有何不同之后,我们自然要问的问题是:“一个`JobInstance`如何与另一个区分开来?”答案是:`JobParameters`。`JobParameters`对象持有一组用于启动批处理作业的参数。它们可以用于标识,甚至在运行过程中作为参考数据,如下图所示: ![作业参数](./images/job-stereotypes-parameters.png) 图 3。作业参数 在前面的示例中,有两个实例,一个用于 1 月 1 日,另一个用于 1 月 2 日,实际上只有一个`Job`,但它有两个`JobParameter`对象:一个以 01-01-2017 的作业参数启动,另一个以 01-02-2017 的参数启动。因此,契约可以定义为:`JobInstance`=`Job`+ 标识`JobParameters`。这允许开发人员有效地控制`JobInstance`的定义方式,因为他们控制传入的参数。 | |并非所有作业参数都需要有助于识别`JobInstance`。在默认情况下,他们会这么做。但是,该框架还允许使用不影响`JobInstance`的恒等式的参数提交
的`Job`。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### [](#jobexecution)jobexecution a`JobExecution`指的是一次尝试运行作业的技术概念。一次执行可能以失败或成功结束,但除非执行成功完成,否则对应于给定执行的`JobInstance`不被认为是完成的。以前面描述的 Endofday`Job`为例,考虑第一次运行时失败的 01-01-2017 的`JobInstance`。如果以与第一次运行(01-01-2017)相同的标识作业参数再次运行,则会创建一个新的`JobExecution`。然而,仍然只有一个`JobInstance`。 `Job`定义了什么是作业以及如何执行它,而`JobInstance`是一个纯粹的组织对象,用于将执行分组在一起,主要是为了启用正确的重新启动语义。但是,`JobExecution`是运行期间实际发生的事情的主要存储机制,并且包含许多必须控制和持久化的属性,如下表所示: | Property |定义| |-----------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | Status |一个`BatchStatus`对象,它指示执行的状态。在运行时,它是`BatchStatus#STARTED`。如果失败,则为`BatchStatus#FAILED`。如果成功完成
,则为`BatchStatus#COMPLETED`| | startTime |a`java.util.Date`表示开始执行的当前系统时间。
如果作业尚未开始,则此字段为空。| | endTime |a`java.util.Date`表示当前系统执行完成时的时间,
不管是否成功。如果作业尚未
完成,则该字段为空。| | exitStatus |`ExitStatus`,表示运行的结果。它是最重要的,因为它
包含一个返回给调用方的退出代码。有关更多详细信息,请参见第 5 章。如果作业尚未完成,则
字段为空。| | createTime |a`java.util.Date`表示当`JobExecution`是
第一次持续存在时的当前系统时间。作业可能尚未启动(因此没有启动时间),但是
它总是有一个 CreateTime,这是管理作业级别`ExecutionContexts`的框架所要求的。| | lastUpdated |a`java.util.Date`表示上次持久化 a`JobExecution`。如果作业尚未开始,则该字段
为空。| |executionContext |“属性袋”包含在
执行之间需要持久化的任何用户数据。| |failureExceptions|在执行`Job`时遇到的异常列表。如果在`Job`失败期间遇到了多个异常,则这些规则是有用的
。| 这些属性很重要,因为它们是持久的,可以用来完全确定执行的状态。例如,如果 01-01 的 Endofday 作业在晚上 9:00 执行,并在 9:30 失败,则在批处理元数据表中创建以下条目: |工作 \_INST\_ID| JOB\_NAME | |-------------|-----------| | 1 |EndOfDayJob| |作业 \_ 执行 \_ID|TYPE\_CD| KEY\_NAME |DATE\_VAL |IDENTIFYING| |------------------|--------|-------------|----------|-----------| | 1 | DATE |schedule.Date|2017-01-01| TRUE | |JOB\_EXEC\_ID|工作 \_INST\_ID|开始 \_ 时间| END\_TIME |STATUS| |-------------|-------------|----------------|----------------|------| | 1 | 1 |2017-01-01 21:00|2017-01-01 21:30|FAILED| | |列名可能已被缩写或删除,以求清楚和
格式。| |---|---------------------------------------------------------------------------------------------| 现在工作失败了,假设花了一整夜的时间才确定问题,所以“批处理窗口”现在关闭了。进一步假设窗口在晚上 9:00 开始,工作将在 01-01 再次开始,从停止的地方开始,并在 9:30 成功完成。因为现在是第二天,所以也必须运行 01-02 作业,然后在 9:31 开始,并在 10:30 以正常的一小时时间完成。没有要求一个`JobInstance`被一个接一个地启动,除非这两个作业有可能试图访问相同的数据,从而导致数据库级别的锁定问题。完全由调度程序决定何时应该运行`Job`。由于它们是分开的`JobInstances`, Spring 批处理不会试图阻止它们同时运行。(当另一个人已经在运行`JobExecutionAlreadyRunningException`时,试图运行相同的`JobInstance`,结果会抛出一个`JobExecutionAlreadyRunningException`)。现在应该在`JobInstance`和`JobParameters`两个表中都有一个额外的条目,并且在`JobExecution`表中有两个额外的条目,如下表所示: |工作 \_INST\_ID| JOB\_NAME | |-------------|-----------| | 1 |EndOfDayJob| | 2 |EndOfDayJob| |JOB\_EXECUTION\_ID|TYPE\_CD| KEY\_NAME |日期 \_val|IDENTIFYING| |------------------|--------|-------------|-------------------|-----------| | 1 | DATE |schedule.Date|2017-01-01 00:00:00| TRUE | | 2 | DATE |schedule.Date|2017-01-01 00:00:00| TRUE | | 3 | DATE |schedule.Date|2017-01-02 00:00:00| TRUE | |JOB\_EXEC\_ID|工作 \_INST\_ID|开始 \_ 时间| END\_TIME | STATUS | |-------------|-------------|----------------|----------------|---------| | 1 | 1 |2017-01-01 21:00|2017-01-01 21:30| FAILED | | 2 | 1 |2017-01-02 21:00|2017-01-02 21:30|COMPLETED| | 3 | 2 |2017-01-02 21:31|2017-01-02 22:29|COMPLETED| | |列名可能已被缩写或删除,以求清楚和
格式。| |---|---------------------------------------------------------------------------------------------| ### [](#step)步骤 `Step`是一个域对象,它封装了批处理作业的一个独立的、连续的阶段。因此,每一项工作都完全由一个或多个步骤组成。a`Step`包含定义和控制实际批处理所需的所有信息。这必然是一个模糊的描述,因为任何给定的`Step`的内容都是由编写`Job`的开发人员自行决定的。a`Step`可以是简单的,也可以是复杂的,正如开发人员所希望的那样。简单的`Step`可能会将文件中的数据加载到数据库中,只需要很少或不需要代码(取决于使用的实现)。更复杂的`Step`可能具有复杂的业务规则,这些规则作为处理的一部分被应用。与`Job`一样,`Step`具有与唯一的`StepExecution`相关的个体`StepExecution`,如下图所示: ![图 2.1:带有步骤的工作层次结构](./images/jobHeirarchyWithSteps.png) 图 4。带有步骤的工作层次结构 #### [](#stepexecution)分步执行 a`StepExecution`表示试图执行`Step`的一次尝试。每次运行`Step`都会创建一个新的`StepExecution`,类似于`JobExecution`。但是,如果一个步骤由于它失败之前的步骤而无法执行,则不会对它执行持久化。只有当它的`Step`实际启动时,才会创建`StepExecution`。 `Step`执行由`StepExecution`类的对象表示。每个执行都包含对其相应步骤的引用和`JobExecution`以及与事务相关的数据,例如提交和回滚计数以及开始和结束时间。此外,每个步骤的执行都包含`ExecutionContext`,其中包含开发人员需要在批处理运行中保持的任何数据,例如重新启动所需的统计信息或状态信息。下表列出了`StepExecution`的属性: | Property |定义| |----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | Status |指示执行状态的`BatchStatus`对象。运行时,
状态为`BatchStatus.STARTED`。如果失败,则状态为`BatchStatus.FAILED`。如果
成功完成,则状态为`BatchStatus.COMPLETED`。| | startTime |a`java.util.Date`表示开始执行的当前系统时间。
如果该步骤尚未开始,则此字段为空。| | endTime |a`java.util.Date`表示当前系统执行完成的时间,
不管是否成功。如果该步骤尚未
退出,则此字段为空。| | exitStatus |表示执行结果的`ExitStatus`。这是最重要的,因为
它包含一个返回给调用者的退出代码。有关更多详细信息,请参见第 5 章。
如果作业尚未退出,则此字段为空。| |executionContext|“属性袋”包含在
执行之间需要持久化的任何用户数据。| | readCount |已成功读取的项目的数量。| | writeCount |已成功写入的项目的数量。| | commitCount |已提交用于此执行的事务的数量。| | rollbackCount |由`Step`控制的业务事务被回滚
的次数。| | readSkipCount |失败的次数`read`,导致项目被跳过。| |processSkipCount|`process`失败的次数,导致项目被跳过。| | filterCount |已被`ItemProcessor`“过滤”的项数。| | writeSkipCount |失败的次数`write`,导致项目被跳过。| ### [](#executioncontext)ExecutionContext `ExecutionContext`表示一组键/值对的集合,这些键/值对由框架持久化并控制,以便允许开发人员有一个存储持久状态的位置,该状态的作用域为`StepExecution`对象或`JobExecution`对象。对于那些熟悉 Quartz 的人来说,它与 JobDataMap 非常相似。最好的使用示例是方便重新启动。以平面文件输入为例,在处理单个行时,该框架会在提交点周期性地保存`ExecutionContext`。这样做允许`ItemReader`存储其状态,以防在运行过程中发生致命错误,甚至断电。所需要的只是将当前读取的行数放入上下文中,如下面的示例所示,框架将完成其余的工作: ``` executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition()); ``` 使用`Job`刻板印象部分中的 Endofday 示例作为示例,假设有一个步骤“loaddata”将文件加载到数据库中。在第一次运行失败之后,元数据表将如下所示: |JOB\_INST\_ID| JOB\_NAME | |-------------|-----------| | 1 |EndOfDayJob| |JOB\_INST\_ID|TYPE\_CD| KEY\_NAME |DATE\_VAL | |-------------|--------|-------------|----------| | 1 | DATE |schedule.Date|2017-01-01| |JOB\_EXEC\_ID|JOB\_INST\_ID|开始 \_ 时间| END\_TIME |STATUS| |-------------|-------------|----------------|----------------|------| | 1 | 1 |2017-01-01 21:00|2017-01-01 21:30|FAILED| |STEP\_EXEC\_ID|JOB\_EXEC\_ID|STEP\_NAME|开始 \_ 时间| END\_TIME |STATUS| |--------------|-------------|----------|----------------|----------------|------| | 1 | 1 | loadData |2017-01-01 21:00|2017-01-01 21:30|FAILED| |STEP\_EXEC\_ID|短 \_ 上下文| |--------------|-------------------| | 1 |{piece.count=40321}| 在前一种情况下,`Step`运行了 30 分钟,处理了 40,321 个“片段”,这将表示此场景中文件中的行。这个值在框架每次提交之前进行更新,并且可以包含与`ExecutionContext`中的条目相对应的多行。在提交之前被通知需要各种`StepListener`实现中的一种(或`ItemStream`),这将在本指南的后面进行更详细的讨论。与前面的示例一样,假定`Job`在第二天重新启动。重新启动时,将从数据库重新构造上次运行的`ExecutionContext`中的值。当打开`ItemReader`时,它可以检查上下文中是否有任何存储状态,并从那里初始化自己,如以下示例所示: ``` if (executionContext.containsKey(getKey(LINES_READ_COUNT))) { log.debug("Initializing for restart. Restart data is: " + executionContext); long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT)); LineReader reader = getReader(); Object record = ""; while (reader.getPosition() < lineCount && record != null) { record = readLine(); } } ``` 在这种情况下,在上面的代码运行之后,当前行是 40,322,允许`Step`从它停止的地方重新开始。`ExecutionContext`还可以用于需要对运行本身进行持久化的统计信息。例如,如果一个平面文件包含跨多行的处理订单,则可能需要存储已处理的订单数量(这与读取的行数有很大不同),使一封电子邮件可以在`Step`的末尾发送,并在正文中处理订单的总数。框架为开发人员处理此存储,以便使用单独的`JobInstance`正确地对其进行范围设置。很难知道是否应该使用现有的`ExecutionContext`。例如,使用上面的“Endofday”示例,当 01-01 运行再次开始第二次时,框架识别出它是相同的`JobInstance`,并且在一个单独的`Step`基础上,将`ExecutionContext`从数据库中拉出,并将它(作为`StepExecution`的一部分)交给`Step`本身。相反,对于 01-02 运行,框架认识到它是一个不同的实例,因此必须将一个空上下文交给`Step`。框架为开发人员做出了许多此类决定,以确保在正确的时间将状态赋予他们。同样重要的是要注意,在任何给定的时间,每个`StepExecution`都存在一个`ExecutionContext`。`ExecutionContext`的客户端应该小心,因为这会创建一个共享密钥区。因此,在放入值时应该小心,以确保没有数据被覆盖。然而,`Step`在上下文中绝对不存储数据,因此没有办法对框架产生不利影响。 同样重要的是要注意,每`ExecutionContext`至少有一个`JobExecution`,每`StepExecution`至少有一个。例如,考虑以下代码片段: ``` ExecutionContext ecStep = stepExecution.getExecutionContext(); ExecutionContext ecJob = jobExecution.getExecutionContext(); //ecStep does not equal ecJob ``` 如注释中所指出的,`ecStep`不等于`ecJob`。它们是两个不同的`ExecutionContexts`。作用域为`Step`的一个被保存在`Step`中的每个提交点,而作用域为该作业的一个被保存在每个`Step`执行之间。 ### [](#jobrepository)JobRepository `JobRepository`是上述所有刻板印象的持久性机制。它为`JobLauncher`、`Job`和`Step`实现提供增删改查操作。当`Job`首次启动时,将从存储库获得`JobExecution`,并且在执行过程中,通过将`StepExecution`和`JobExecution`实现传递到存储库来持久化它们。 Spring 批处理 XML 命名空间提供了对配置带有``标记的`JobRepository`实例的支持,如以下示例所示: ``` ``` 当使用 Java 配置时,`@EnableBatchProcessing`注释提供了`JobRepository`作为自动配置的组件之一。 ### [](#joblauncher)joblauncher `JobLauncher`表示用于启动`Job`具有给定的`JobParameters`集的`Job`的简单接口,如以下示例所示: ``` public interface JobLauncher { public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException; } ``` 期望实现从`JobRepository`获得有效的`JobExecution`并执行`Job`。 ### [](#item-reader)条目阅读器 `ItemReader`是一种抽象,表示对`Step`输入的检索,每次检索一项。当`ItemReader`已经耗尽了它可以提供的项时,它通过返回`null`来表示这一点。有关`ItemReader`接口及其各种实现方式的更多详细信息,请参见[读者和作家](readersAndWriters.html#readersAndWriters)。 ### [](#item-writer)item writer `ItemWriter`是一种抽象,它表示`Step`的输出,一次输出一个批处理或一大块项目。通常,`ItemWriter`不知道下一步应该接收的输入,只知道当前调用中传递的项。有关`ItemWriter`接口及其各种实现方式的更多详细信息,请参见[读者和作家](readersAndWriters.html#readersAndWriters)。 ### [](#item-processor)项处理器 `ItemProcessor`是表示项目的业务处理的抽象。当`ItemReader`读取一个项,而`ItemWriter`写入它们时,`ItemProcessor`提供了一个接入点来转换或应用其他业务处理。如果在处理该项时确定该项无效,则返回`null`表示不应写出该项。有关`ItemProcessor`接口的更多详细信息,请参见[读者和作家](readersAndWriters.html#readersAndWriters)。 ### [](#batch-namespace)批处理名称空间 前面列出的许多域概念需要在 Spring `ApplicationContext`中进行配置。虽然有上述接口的实现方式可以在标准 Bean 定义中使用,但提供了一个名称空间以便于配置,如以下示例所示: ``` ``` 只要已声明批处理命名空间,就可以使用它的任何元素。有关配置作业的更多信息,请参见[配置和运行作业](job.html#configureJob)。有关配置`Step`的更多信息,请参见[配置一个步骤](step.html#configureStep)。