domain.md 23.6 KB
Newer Older
dallascao's avatar
dallascao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
# 批处理的领域语言

## [](#domainLanguageOfBatch)批处理的域语言

XMLJavaBoth

对于任何有经验的批处理架构师来说, Spring 批处理中使用的批处理的总体概念应该是熟悉和舒适的。有“作业”和“步骤”以及开发人员提供的处理单元,分别称为`ItemReader``ItemWriter`。然而,由于 Spring 模式、操作、模板、回调和习惯用法,有以下机会:

* 在坚持明确区分关注事项方面有了显著改善。

* 清晰地描述了作为接口提供的体系结构层和服务。

* 简单和默认的实现,允许快速采用和易于使用的开箱即用。

* 显著增强了可扩展性。

下图是使用了几十年的批处理引用体系结构的简化版本。它提供了组成批处理领域语言的组件的概述。这个架构框架是一个蓝图,已经通过过去几代平台(COBOL/大型机、C/UNIX 和现在的 Java/Anywhere)上几十年的实现得到了证明。JCL 和 COBOL 开发人员可能与 C、C# 和 Java 开发人员一样熟悉这些概念。 Spring 批处理提供了通常在健壮的、可维护的系统中发现的层、组件和技术服务的物理实现,这些系统被用于解决创建简单到复杂的批处理应用程序,具有用于解决非常复杂的处理需求的基础设施和扩展。

![图 2.1:批处理原型](./images/spring-batch-reference-model.png)

图 1。批处理模式

前面的图表突出了构成 Spring 批处理的域语言的关键概念。一个作业有一个到多个步骤,每个步骤正好有一个`ItemReader`,一个`ItemProcessor`和一个`ItemWriter`。需要启动一个作业(使用`JobLauncher`),并且需要存储有关当前运行的进程的元数据(在`JobRepository`中)。

### [](#job)工作

这一部分描述了与批处理作业的概念有关的刻板印象。`Job`是封装整个批处理过程的实体。与其他 Spring 项目一样,`Job`与 XML 配置文件或基于 Java 的配置连接在一起。这种配置可以称为“作业配置”。然而,`Job`只是整个层次结构的顶部,如下图所示:

![工作层次结构](./images/job-heirarchy.png)

图 2。工作层次结构

在 Spring 批处理中,`Job`只是用于`Step`实例的容器。它将逻辑上属于一个流的多个步骤组合在一起,并允许将属性的全局配置用于所有步骤,例如可重启性。作业配置包含:

* 工作的简单名称。

* `Step`实例的定义和排序。

* 这份工作是否可以重新启动。

对于那些使用 Java 配置的人, Spring Batch 以`SimpleJob`类的形式提供了作业接口的默认实现,它在`Job`之上创建了一些标准功能。当使用基于 Java 的配置时,可以使用一个构建器集合来实例化`Job`,如以下示例所示:

```
@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .build();
}
```

对于那些使用 XML 配置的人, Spring Batch 以`SimpleJob`类的形式提供了`Job`接口的默认实现,它在`Job`之上创建了一些标准功能。然而,批处理名称空间抽象出了直接实例化它的需要。相反,可以使用`<job>`元素,如以下示例所示:

```
<job id="footballJob">
    <step id="playerload" next="gameLoad"/>
    <step id="gameLoad" next="playerSummarization"/>
    <step id="playerSummarization"/>
</job>
```

#### [](#jobinstance)JobInstance

a`JobInstance`指的是逻辑作业运行的概念。考虑应该在一天结束时运行一次的批处理作业,例如前面图表中的“endofday”`Job`。有一个“endofday”作业,但是`Job`的每个单独运行都必须单独跟踪。在这种情况下,每天有一个逻辑`JobInstance`。例如,有一个 1 月 1 日运行,1 月 2 日运行,以此类推。如果 1 月 1 日运行第一次失败,并在第二天再次运行,它仍然是 1 月 1 日运行。(通常,这也对应于它正在处理的数据,这意味着 1 月 1 日运行处理 1 月 1 日的数据)。因此,每个`JobInstance`都可以有多个执行(`JobExecution`在本章后面更详细地讨论),并且只有一个`JobInstance`对应于特定的`Job`和标识`JobParameters`的执行可以在给定的时间运行。

`JobInstance`的定义与要加载的数据完全无关。这完全取决于`ItemReader`实现来确定如何加载数据。例如,在 Endofday 场景中,数据上可能有一个列,该列指示数据所属的“生效日期”或“计划日期”。因此,1 月 1 日的运行将只加载 1 日的数据,而 1 月 2 日的运行将只使用 2 日的数据。因为这个决定很可能是一个商业决定,所以它是由`ItemReader`来决定的。然而,使用相同的`JobInstance`确定是否使用来自先前执行的’状态’(即`ExecutionContext`,这将在本章后面讨论)。使用一个新的`JobInstance`表示“从开始”,而使用一个现有的实例通常表示“从你停止的地方开始”。

#### [](#jobparameters)JobParameters

在讨论了`JobInstance`以及它与约伯有何不同之后,我们自然要问的问题是:“一个`JobInstance`如何与另一个区分开来?”答案是:`JobParameters``JobParameters`对象持有一组用于启动批处理作业的参数。它们可以用于标识,甚至在运行过程中作为参考数据,如下图所示:

![作业参数](./images/job-stereotypes-parameters.png)

图 3。作业参数

在前面的示例中,有两个实例,一个用于 1 月 1 日,另一个用于 1 月 2 日,实际上只有一个`Job`,但它有两个`JobParameter`对象:一个以 01-01-2017 的作业参数启动,另一个以 01-02-2017 的参数启动。因此,契约可以定义为:`JobInstance`=`Job`+ 标识`JobParameters`。这允许开发人员有效地控制`JobInstance`的定义方式,因为他们控制传入的参数。

|   |并非所有作业参数都需要有助于识别`JobInstance`。在默认情况下,他们会这么做。但是,该框架还允许使用不影响`JobInstance`的恒等式的参数提交<br/>`Job`。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

#### [](#jobexecution)jobexecution

a`JobExecution`指的是一次尝试运行作业的技术概念。一次执行可能以失败或成功结束,但除非执行成功完成,否则对应于给定执行的`JobInstance`不被认为是完成的。以前面描述的 Endofday`Job`为例,考虑第一次运行时失败的 01-01-2017 的`JobInstance`。如果以与第一次运行(01-01-2017)相同的标识作业参数再次运行,则会创建一个新的`JobExecution`。然而,仍然只有一个`JobInstance`

`Job`定义了什么是作业以及如何执行它,而`JobInstance`是一个纯粹的组织对象,用于将执行分组在一起,主要是为了启用正确的重新启动语义。但是,`JobExecution`是运行期间实际发生的事情的主要存储机制,并且包含许多必须控制和持久化的属性,如下表所示:

|    Property     |定义|
|-----------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|     Status      |一个`BatchStatus`对象,它指示执行的状态。在运行时,它是`BatchStatus#STARTED`。如果失败,则为`BatchStatus#FAILED`。如果成功完成<br/>,则为`BatchStatus#COMPLETED`|
|    startTime    |a`java.util.Date`表示开始执行的当前系统时间。<br/>如果作业尚未开始,则此字段为空。|
|     endTime     |a`java.util.Date`表示当前系统执行完成时的时间,<br/>不管是否成功。如果作业尚未<br/>完成,则该字段为空。|
|   exitStatus    |`ExitStatus`,表示运行的结果。它是最重要的,因为它<br/>包含一个返回给调用方的退出代码。有关更多详细信息,请参见第 5 章。如果作业尚未完成,则<br/>字段为空。|
|   createTime    |a`java.util.Date`表示当`JobExecution`<br/>第一次持续存在时的当前系统时间。作业可能尚未启动(因此没有启动时间),但是<br/>它总是有一个 CreateTime,这是管理作业级别`ExecutionContexts`的框架所要求的。|
|   lastUpdated   |a`java.util.Date`表示上次持久化 a`JobExecution`。如果作业尚未开始,则该字段<br/>为空。|
|executionContext |“属性袋”包含在<br/>执行之间需要持久化的任何用户数据。|
|failureExceptions|在执行`Job`时遇到的异常列表。如果在`Job`失败期间遇到了多个异常,则这些规则是有用的<br/>。|

这些属性很重要,因为它们是持久的,可以用来完全确定执行的状态。例如,如果 01-01 的 Endofday 作业在晚上 9:00 执行,并在 9:30 失败,则在批处理元数据表中创建以下条目:

|工作 \_INST\_ID| JOB\_NAME |
|-------------|-----------|
|      1      |EndOfDayJob|

|作业 \_ 执行 \_ID|TYPE\_CD|  KEY\_NAME  |DATE\_VAL |IDENTIFYING|
|------------------|--------|-------------|----------|-----------|
|        1         |  DATE  |schedule.Date|2017-01-01|   TRUE    |

|JOB\_EXEC\_ID|工作 \_INST\_ID|开始 \_ 时间|   END\_TIME    |STATUS|
|-------------|-------------|----------------|----------------|------|
|      1      |      1      |2017-01-01 21:00|2017-01-01 21:30|FAILED|

|   |列名可能已被缩写或删除,以求清楚和<br/>格式。|
|---|---------------------------------------------------------------------------------------------|

现在工作失败了,假设花了一整夜的时间才确定问题,所以“批处理窗口”现在关闭了。进一步假设窗口在晚上 9:00 开始,工作将在 01-01 再次开始,从停止的地方开始,并在 9:30 成功完成。因为现在是第二天,所以也必须运行 01-02 作业,然后在 9:31 开始,并在 10:30 以正常的一小时时间完成。没有要求一个`JobInstance`被一个接一个地启动,除非这两个作业有可能试图访问相同的数据,从而导致数据库级别的锁定问题。完全由调度程序决定何时应该运行`Job`。由于它们是分开的`JobInstances`, Spring 批处理不会试图阻止它们同时运行。(当另一个人已经在运行`JobExecutionAlreadyRunningException`时,试图运行相同的`JobInstance`,结果会抛出一个`JobExecutionAlreadyRunningException`)。现在应该在`JobInstance``JobParameters`两个表中都有一个额外的条目,并且在`JobExecution`表中有两个额外的条目,如下表所示:

|工作 \_INST\_ID| JOB\_NAME |
|-------------|-----------|
|      1      |EndOfDayJob|
|      2      |EndOfDayJob|

|JOB\_EXECUTION\_ID|TYPE\_CD|  KEY\_NAME  |日期 \_val|IDENTIFYING|
|------------------|--------|-------------|-------------------|-----------|
|        1         |  DATE  |schedule.Date|2017-01-01 00:00:00|   TRUE    |
|        2         |  DATE  |schedule.Date|2017-01-01 00:00:00|   TRUE    |
|        3         |  DATE  |schedule.Date|2017-01-02 00:00:00|   TRUE    |

|JOB\_EXEC\_ID|工作 \_INST\_ID|开始 \_ 时间|   END\_TIME    | STATUS  |
|-------------|-------------|----------------|----------------|---------|
|      1      |      1      |2017-01-01 21:00|2017-01-01 21:30| FAILED  |
|      2      |      1      |2017-01-02 21:00|2017-01-02 21:30|COMPLETED|
|      3      |      2      |2017-01-02 21:31|2017-01-02 22:29|COMPLETED|

|   |列名可能已被缩写或删除,以求清楚和<br/>格式。|
|---|---------------------------------------------------------------------------------------------|

### [](#step)步骤

`Step`是一个域对象,它封装了批处理作业的一个独立的、连续的阶段。因此,每一项工作都完全由一个或多个步骤组成。a`Step`包含定义和控制实际批处理所需的所有信息。这必然是一个模糊的描述,因为任何给定的`Step`的内容都是由编写`Job`的开发人员自行决定的。a`Step`可以是简单的,也可以是复杂的,正如开发人员所希望的那样。简单的`Step`可能会将文件中的数据加载到数据库中,只需要很少或不需要代码(取决于使用的实现)。更复杂的`Step`可能具有复杂的业务规则,这些规则作为处理的一部分被应用。与`Job`一样,`Step`具有与唯一的`StepExecution`相关的个体`StepExecution`,如下图所示:

![图 2.1:带有步骤的工作层次结构](./images/jobHeirarchyWithSteps.png)

图 4。带有步骤的工作层次结构

#### [](#stepexecution)分步执行

a`StepExecution`表示试图执行`Step`的一次尝试。每次运行`Step`都会创建一个新的`StepExecution`,类似于`JobExecution`。但是,如果一个步骤由于它失败之前的步骤而无法执行,则不会对它执行持久化。只有当它的`Step`实际启动时,才会创建`StepExecution`

`Step`执行由`StepExecution`类的对象表示。每个执行都包含对其相应步骤的引用和`JobExecution`以及与事务相关的数据,例如提交和回滚计数以及开始和结束时间。此外,每个步骤的执行都包含`ExecutionContext`,其中包含开发人员需要在批处理运行中保持的任何数据,例如重新启动所需的统计信息或状态信息。下表列出了`StepExecution`的属性:

|    Property    |定义|
|----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|     Status     |指示执行状态的`BatchStatus`对象。运行时,<br/>状态为`BatchStatus.STARTED`。如果失败,则状态为`BatchStatus.FAILED`。如果<br/>成功完成,则状态为`BatchStatus.COMPLETED`。|
|   startTime    |a`java.util.Date`表示开始执行的当前系统时间。<br/>如果该步骤尚未开始,则此字段为空。|
|    endTime     |a`java.util.Date`表示当前系统执行完成的时间,<br/>不管是否成功。如果该步骤尚未<br/>退出,则此字段为空。|
|   exitStatus   |表示执行结果的`ExitStatus`。这是最重要的,因为<br/>它包含一个返回给调用者的退出代码。有关更多详细信息,请参见第 5 章。<br/>如果作业尚未退出,则此字段为空。|
|executionContext|“属性袋”包含在<br/>执行之间需要持久化的任何用户数据。|
|   readCount    |已成功读取的项目的数量。|
|   writeCount   |已成功写入的项目的数量。|
|  commitCount   |已提交用于此执行的事务的数量。|
| rollbackCount  |由`Step`控制的业务事务被回滚<br/>的次数。|
| readSkipCount  |失败的次数`read`,导致项目被跳过。|
|processSkipCount|`process`失败的次数,导致项目被跳过。|
|  filterCount   |已被`ItemProcessor`“过滤”的项数。|
| writeSkipCount |失败的次数`write`,导致项目被跳过。|

### [](#executioncontext)ExecutionContext

`ExecutionContext`表示一组键/值对的集合,这些键/值对由框架持久化并控制,以便允许开发人员有一个存储持久状态的位置,该状态的作用域为`StepExecution`对象或`JobExecution`对象。对于那些熟悉 Quartz 的人来说,它与 JobDataMap 非常相似。最好的使用示例是方便重新启动。以平面文件输入为例,在处理单个行时,该框架会在提交点周期性地保存`ExecutionContext`。这样做允许`ItemReader`存储其状态,以防在运行过程中发生致命错误,甚至断电。所需要的只是将当前读取的行数放入上下文中,如下面的示例所示,框架将完成其余的工作:

```
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());
```

使用`Job`刻板印象部分中的 Endofday 示例作为示例,假设有一个步骤“loaddata”将文件加载到数据库中。在第一次运行失败之后,元数据表将如下所示:

|JOB\_INST\_ID| JOB\_NAME |
|-------------|-----------|
|      1      |EndOfDayJob|

|JOB\_INST\_ID|TYPE\_CD|  KEY\_NAME  |DATE\_VAL |
|-------------|--------|-------------|----------|
|      1      |  DATE  |schedule.Date|2017-01-01|

|JOB\_EXEC\_ID|JOB\_INST\_ID|开始 \_ 时间|   END\_TIME    |STATUS|
|-------------|-------------|----------------|----------------|------|
|      1      |      1      |2017-01-01 21:00|2017-01-01 21:30|FAILED|

|STEP\_EXEC\_ID|JOB\_EXEC\_ID|STEP\_NAME|开始 \_ 时间|   END\_TIME    |STATUS|
|--------------|-------------|----------|----------------|----------------|------|
|      1       |      1      | loadData |2017-01-01 21:00|2017-01-01 21:30|FAILED|

|STEP\_EXEC\_ID|短 \_ 上下文|
|--------------|-------------------|
|      1       |{piece.count=40321}|

在前一种情况下,`Step`运行了 30 分钟,处理了 40,321 个“片段”,这将表示此场景中文件中的行。这个值在框架每次提交之前进行更新,并且可以包含与`ExecutionContext`中的条目相对应的多行。在提交之前被通知需要各种`StepListener`实现中的一种(或`ItemStream`),这将在本指南的后面进行更详细的讨论。与前面的示例一样,假定`Job`在第二天重新启动。重新启动时,将从数据库重新构造上次运行的`ExecutionContext`中的值。当打开`ItemReader`时,它可以检查上下文中是否有任何存储状态,并从那里初始化自己,如以下示例所示:

```
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}
```

在这种情况下,在上面的代码运行之后,当前行是 40,322,允许`Step`从它停止的地方重新开始。`ExecutionContext`还可以用于需要对运行本身进行持久化的统计信息。例如,如果一个平面文件包含跨多行的处理订单,则可能需要存储已处理的订单数量(这与读取的行数有很大不同),使一封电子邮件可以在`Step`的末尾发送,并在正文中处理订单的总数。框架为开发人员处理此存储,以便使用单独的`JobInstance`正确地对其进行范围设置。很难知道是否应该使用现有的`ExecutionContext`。例如,使用上面的“Endofday”示例,当 01-01 运行再次开始第二次时,框架识别出它是相同的`JobInstance`,并且在一个单独的`Step`基础上,将`ExecutionContext`从数据库中拉出,并将它(作为`StepExecution`的一部分)交给`Step`本身。相反,对于 01-02 运行,框架认识到它是一个不同的实例,因此必须将一个空上下文交给`Step`。框架为开发人员做出了许多此类决定,以确保在正确的时间将状态赋予他们。同样重要的是要注意,在任何给定的时间,每个`StepExecution`都存在一个`ExecutionContext``ExecutionContext`的客户端应该小心,因为这会创建一个共享密钥区。因此,在放入值时应该小心,以确保没有数据被覆盖。然而,`Step`在上下文中绝对不存储数据,因此没有办法对框架产生不利影响。

同样重要的是要注意,每`ExecutionContext`至少有一个`JobExecution`,每`StepExecution`至少有一个。例如,考虑以下代码片段:

```
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob
```

如注释中所指出的,`ecStep`不等于`ecJob`。它们是两个不同的`ExecutionContexts`。作用域为`Step`的一个被保存在`Step`中的每个提交点,而作用域为该作业的一个被保存在每个`Step`执行之间。

### [](#jobrepository)JobRepository

`JobRepository`是上述所有刻板印象的持久性机制。它为`JobLauncher``Job``Step`实现提供增删改查操作。当`Job`首次启动时,将从存储库获得`JobExecution`,并且在执行过程中,通过将`StepExecution``JobExecution`实现传递到存储库来持久化它们。

Spring 批处理 XML 命名空间提供了对配置带有`<job-repository>`标记的`JobRepository`实例的支持,如以下示例所示:

```
<job-repository id="jobRepository"/>
```

当使用 Java 配置时,`@EnableBatchProcessing`注释提供了`JobRepository`作为自动配置的组件之一。

### [](#joblauncher)joblauncher

`JobLauncher`表示用于启动`Job`具有给定的`JobParameters`集的`Job`的简单接口,如以下示例所示:

```
public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
```

期望实现从`JobRepository`获得有效的`JobExecution`并执行`Job`

### [](#item-reader)条目阅读器

`ItemReader`是一种抽象,表示对`Step`输入的检索,每次检索一项。当`ItemReader`已经耗尽了它可以提供的项时,它通过返回`null`来表示这一点。有关`ItemReader`接口及其各种实现方式的更多详细信息,请参见[读者和作家](readersAndWriters.html#readersAndWriters)

### [](#item-writer)item writer

`ItemWriter`是一种抽象,它表示`Step`的输出,一次输出一个批处理或一大块项目。通常,`ItemWriter`不知道下一步应该接收的输入,只知道当前调用中传递的项。有关`ItemWriter`接口及其各种实现方式的更多详细信息,请参见[读者和作家](readersAndWriters.html#readersAndWriters)

### [](#item-processor)项处理器

`ItemProcessor`是表示项目的业务处理的抽象。当`ItemReader`读取一个项,而`ItemWriter`写入它们时,`ItemProcessor`提供了一个接入点来转换或应用其他业务处理。如果在处理该项时确定该项无效,则返回`null`表示不应写出该项。有关`ItemProcessor`接口的更多详细信息,请参见[读者和作家](readersAndWriters.html#readersAndWriters)

### [](#batch-namespace)批处理名称空间

前面列出的许多域概念需要在 Spring `ApplicationContext`中进行配置。虽然有上述接口的实现方式可以在标准 Bean 定义中使用,但提供了一个名称空间以便于配置,如以下示例所示:

```
<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/batch
   https://www.springframework.org/schema/batch/spring-batch.xsd">

<job id="ioSampleJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        </tasklet>
    </step>
</job>

</beans:beans>
```

只要已声明批处理命名空间,就可以使用它的任何元素。有关配置作业的更多信息,请参见[配置和运行作业](job.html#configureJob)。有关配置`Step`的更多信息,请参见[配置一个步骤](step.html#configureStep)