提交 90348f31 编写于 作者: 茶陵後's avatar 茶陵後 👍

#27 spring batch 基本格式审核调整

上级 7f66f8fc
......@@ -532,6 +532,35 @@ module.exports = {
initialOpenGroupIndex: 0 // 可选的, 默认值是 0
}
],
'/spring-batch/': [
{
title: 'Spring Batch 文档',
sidebarDepth: 2,
collapsable: false,
children: [
"/spring-batch/spring-batch-intro.md",
"/spring-batch/whatsnew.md",
"/spring-batch/domain.md",
"/spring-batch/job.md",
"/spring-batch/step.md",
"/spring-batch/readersAndWriters.md",
"/spring-batch/processor.md",
"/spring-batch/scalability.md",
"/spring-batch/repeat.md",
"/spring-batch/retry.md",
"/spring-batch/testing.md",
"/spring-batch/common-patterns.md",
"/spring-batch/jsr-352.md",
"/spring-batch/spring-batch-integration.md",
"/spring-batch/monitoring-and-metrics.md",
"/spring-batch/appendix.md",
"/spring-batch/schema-appendix.md",
"/spring-batch/transaction-appendix.md",
"/spring-batch/glossary.md"
],
initialOpenGroupIndex: 0 // 可选的, 默认值是 0
}
],
'/spring-amqp/': [
{
title: 'Spring AMQP 文档',
......
# Spring Batch
\ No newline at end of file
## 附录 A:条目阅读器和条目编写器列表
### 条目阅读器
| Item Reader |说明|
|----------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|AbstractItemCountingItemStreamItemReader|抽象基类,通过计算从<br/>`ItemReader`返回的项数,提供基本的<br/>重启功能。|
| AggregateItemReader |一个`ItemReader`提供一个列表作为其<br/>项,存储来自注入的`ItemReader`的对象,直到它们<br/>准备好作为集合打包。必须将这个类<br/>用作自定义`ItemReader`的包装器,该包装器可以标识记录<br/>的边界。自定义读取器应该通过返回一个`AggregateItem`来标记<br/>记录的开始和结束,它将对其`true`查询方法<br/>`isFooter()`进行响应。请注意,这个阅读器<br/>不是 Spring 批<br/>提供的阅读器库的一部分,而是作为`spring-batch-samples`中的示例给出的。|
| AmqpItemReader |给定一个 Spring `AmqpTemplate`,它提供了<br/>同步接收方法。`receiveAndConvert()`方法<br/>允许你接收 POJO 对象。|
| KafkaItemReader |从 Apache Kafka 主题读取消息的`ItemReader`<br/>可以将其配置为从同一主题的多个分区读取消息。<br/>此阅读器在执行上下文中存储消息偏移量,以支持重新启动功能。|
| FlatFileItemReader |从平面文件读取。包括`ItemStream``Skippable`功能。参见[`FlatFileItemReader`]。|
| HibernateCursorItemReader |基于 HQL 查询从游标读取。参见[`Cursor-based ItemReaders`]。|
| HibernatePagingItemReader |从分页的 HQL 查询中读取|
| ItemReaderAdapter |将任何类调整为`ItemReader`接口。|
| JdbcCursorItemReader |通过 JDBC 从数据库游标读取数据。参见[`Cursor-based ItemReaders`]。|
| JdbcPagingItemReader |给定一个 SQL 语句,在行中进行分页<br/>,这样就可以在不耗尽<br/>内存的情况下读取大型数据集。|
| JmsItemReader |给定一个 Spring `JmsOperations`对象和一个 JMS<br/>向其发送错误的目的地或目的地名称,提供通过注入的项<br/>接收到的`JmsOperations#receive()`方法。|
| JpaPagingItemReader |给定一个 JPQL 语句,通过<br/>行进行分页,这样就可以在不耗尽<br/>内存的情况下读取大型数据集。|
| ListItemReader |提供列表中的项,在<br/>时间提供一个列表。|
| MongoItemReader |给定一个`MongoOperations`对象和一个基于 JSON 的 MongoDB<br/>查询,提供从`MongoOperations#find()`方法接收的项。|
| Neo4jItemReader |给定一个`Neo4jOperations`对象和一个<br/>Cyhper 查询的组件,返回的项是 NEO4jOperations.query<br/>方法的结果。|
| RepositoryItemReader |给定一个 Spring data`PagingAndSortingRepository`对象,<br/>a`Sort`,以及要执行的方法的名称,返回由<br/> Spring 数据存储库实现的项。|
| StoredProcedureItemReader |从执行数据库存储过程的<br/>所产生的数据库游标读取数据。参见[`StoredProcedureItemReader`]|
| StaxEventItemReader |通过 stax 进行读取,参见[`StaxEventItemReader`]。|
| JsonItemReader |从 JSON 文档中读取项目。参见[`JsonItemReader`]。|
### 条目编写者
| Item Writer |说明|
|--------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| AbstractItemStreamItemWriter |组合`ItemStream``ItemWriter`接口的抽象基类。|
| AmqpItemWriter |给定 Spring `AmqpTemplate`,它为同步`send`方法提供了<br/>`convertAndSend(Object)`方法允许你发送 POJO 对象。|
| CompositeItemWriter |在注入`List``ItemWriter`对象中,将一个项传递给每个<br/>`write`方法。|
| FlatFileItemWriter |写入平面文件。包括`ItemStream`<br/>可跳过的功能。参见[`FlatFileItemWriter`]。|
| GemfireItemWriter |使用`GemfireOperations`对象,可以根据 delete<br/>标志的配置从 Gemfire 实例中写入<br/>项。|
| HibernateItemWriter |这个条目编写器 Hibernate-会话知道<br/>,并处理一些与事务相关的工作,而非“ Hibernate-知道”<br/>的条目编写器不需要了解这些工作,然后将<br/>委托给另一个条目编写器来进行实际的写作。|
| ItemWriterAdapter |将任何类调整为`ItemWriter`接口。|
| JdbcBatchItemWriter |使用来自`PreparedStatement`(如果可用)的批处理功能,并且可以<br/>`flush`期间采取基本步骤来定位故障。|
| JmsItemWriter |使用`JmsOperations`对象,通过`JmsOperations#convertAndSend()`方法将项<br/>写入默认队列。|
| JpaItemWriter |这个条目编写器是 JPA EntityManager-aware 的<br/>,并处理一些与事务相关的工作,而非“ JPA-aware”`ItemWriter`不需要了解这些工作,并且<br/>然后将其委托给另一个编写器来进行实际的编写。|
| KafkaItemWriter |使用`KafkaTemplate`对象,通过`KafkaTemplate#sendDefault(Object, Object)`方法将项写入默认主题,并使用`Converter`来映射该项的键。<br/>还可以配置一个 delete 标志,以将 delete 事件发送到该主题。|
| MimeMessageItemWriter |使用 Spring 的`JavaMailSender`,类型`MimeMessage`的项作为邮件发送。|
| MongoItemWriter |给定一个`MongoOperations`对象,通过`MongoOperations.save(Object)`方法写项目<br/>。实际的写入被延迟<br/>,直到事务提交之前的最后一个可能时刻。|
| Neo4jItemWriter |给定一个`Neo4jOperations`对象,项目将通过`save(Object)`方法持久化,或者根据`ItemWriter’s`配置通过`delete(Object)`方法删除。|
|PropertyExtractingDelegatingItemWriter|扩展`AbstractMethodInvokingDelegator`动态创建参数。参数是通过从要处理的项中的字段(通过`SpringBeanWrapper`)检索<br/>中的值来创建的,基于注入的字段<br/>名称数组。|
| RepositoryItemWriter |给定一个 Spring 数据`CrudRepository`的实现,<br/>项是通过在配置中指定的方法保存的。|
| StaxEventItemWriter |使用`Marshaller`实现<br/>将每个项转换为 XML,然后使用<br/>stax 将其写入 XML 文件。|
|jsonfileitemwriter| 使用`JsonObjectMarshaller`实现将每个项转换为 JSON,然后将其写入 JSON 文件。
\ No newline at end of file
此差异已折叠。
此差异已折叠。
# 词汇表
## 附录 A:术语表
### Spring 批处理术语表
批处理
随着时间的推移,商业交易的积累。
批处理应用程序样式
用于将批处理指定为一种独立的应用程序风格,类似于在线、Web 或 SOA。它具有输入、验证、信息到业务模型的转换、业务处理和输出的标准元素。此外,它还需要宏观层面的监控。
批处理
在一段时间内(如一小时、一天、一周、一个月或一年)积累的大量业务交易的处理。它是一个过程或一组过程以重复和可预测的方式应用于许多数据实体或对象,不需要手动元素,也不需要单独的手动元素进行错误处理。
批处理窗口
批处理作业必须完成的时间范围。这可能会受到其他联机系统、其他需要执行的依赖作业或批处理环境特有的其他因素的限制。
步骤
主要的批处理任务或工作单元.它根据提交间隔设置和其他因素初始化业务逻辑并控制事务环境。
Tasklet
由应用程序开发人员创建的用于处理某一步骤的业务逻辑的组件。
批处理作业类型
作业类型描述了针对特定类型的处理的作业应用。常见的领域是接口处理(通常是平面文件)、表单处理(用于在线 PDF 生成或打印格式)和报告处理。
驾驶查询
驾驶查询标识了一项工作要做的一组工作。然后,这份工作将这份工作分解为各个工作单元。例如,一个驱动查询可能是要识别所有具有“挂起传输”状态的金融交易,并将它们发送到合作伙伴系统。驱动查询返回一组要处理的记录 ID。然后,每个记录 ID 就成为一个工作单位。驱动查询可能涉及一个连接(如果选择的条件跨越两个或更多个表),也可能与单个表一起工作。
项目
项表示用于处理的完整数据的最小量。用最简单的术语来说,这可能是文件中的一行,数据库表中的一行,或者 XML 文件中的特定元素。
逻辑工作单位
批处理作业通过驱动查询(或其他输入源,例如文件)进行迭代,以执行作业必须完成的一组工作。所执行的工作的每一次迭代都是一个工作单位。
提交间隔
在单个事务中处理的一组 LUW。
划分
将作业拆分成多个线程,其中每个线程负责要处理的整个数据的一个子集。执行线程可以在相同的 JVM 中,也可以在支持工作负载平衡的集群环境中跨越 JVM。
分段表
在处理临时数据时保存临时数据的表。
可重启
一种可以再次执行的作业,并假定初始运行时具有相同的标识。换句话说,它具有相同的作业实例 ID。
可重排
可以重新启动并根据前一次运行的记录处理管理自己的状态的作业。一个可重新运行的步骤的例子是一个基于驱动查询的步骤。如果可以形成驱动查询,以便在重新启动作业时限制已处理的行,则可以重新运行该查询。这是由应用程序逻辑管理的。通常,在`where`语句中添加一个条件,以限制驱动查询返回的行,其逻辑类似于“and processedFlag!=true”。
重复
批处理的最基本单元之一,它通过可重复性来定义调用代码的一部分,直到完成并且没有错误为止。通常,只要有输入,批处理过程就是可重复的。
重试
使用与处理事务输出异常最常关联的重试语义简化了操作的执行。Retry 与 Repeat 略有不同,Retry 是有状态的,并以相同的输入连续调用相同的代码块,直到它成功或超出某种类型的重试限制。只有当由于环境中的某些东西得到了改进,操作的后续调用才可能成功时,它才是通常有用的。
恢复
恢复操作以这样一种方式处理异常,即重复进程能够继续。
斯基普
跳过是一种恢复策略,通常用于文件输入源,作为忽略无效验证的错误输入记录的策略。
\ No newline at end of file
此差异已折叠。
此差异已折叠。
# 监测和量度
## 监控和度量
自版本 4.2 以来, Spring Batch 提供了对基于[Micrometer](https://micrometer.io/)的批监视和度量的支持。本节描述了哪些度量是开箱即用的,以及如何贡献自定义度量。
### 内置度量
度量集合不需要任何特定的配置。框架提供的所有指标都注册在[千分尺的全球注册中心](https://micrometer.io/docs/concepts#_global_registry)`spring.batch`前缀下。下表详细解释了所有指标:
| *Metric Name* | *Type* | *Description* |*标签*|
|---------------------------|-----------------|---------------------------|---------------------------------|
| `spring.batch.job` | `TIMER` | Duration of job execution |`name`, `status`|
| `spring.batch.job.active` |`LONG_TASK_TIMER`| Currently active jobs |`name`|
| `spring.batch.step` | `TIMER` |Duration of step execution |`name`, `job.name`, `status`|
| `spring.batch.item.read` | `TIMER` | Duration of item reading |`job.name`, `step.name`, `status`|
|`spring.batch.item.process`| `TIMER` |Duration of item processing|`job.name`, `step.name`, `status`|
|`spring.batch.chunk.write` | `TIMER` | Duration of chunk writing |`job.name`, `step.name`, `status`|
| |`status`标记可以是`SUCCESS``FAILURE`。|
|---|------------------------------------------------------|
### 自定义度量
如果你想在自定义组件中使用自己的度量,我们建议直接使用 Micrometer API。以下是如何对`Tasklet`进行计时的示例:
```
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
public class MyTimedTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
Timer.Sample sample = Timer.start(Metrics.globalRegistry);
String status = "success";
try {
// do some work
} catch (Exception e) {
// handle exception
status = "failure";
} finally {
sample.stop(Timer.builder("my.tasklet.timer")
.description("Duration of MyTimedTasklet")
.tag("status", status)
.register(Metrics.globalRegistry));
}
return RepeatStatus.FINISHED;
}
}
```
### 禁用度量
度量收集是一个类似于日志记录的问题。禁用日志通常是通过配置日志记录库来完成的,对于度量标准来说也是如此。在 Spring 批处理中没有禁用千分尺的度量的功能,这应该在千分尺的一侧完成。由于 Spring 批处理将度量存储在带有`spring.batch`前缀的 Micrometer 的全局注册中心中,因此可以通过以下代码片段将 Micrometer 配置为忽略/拒绝批处理度量:
```
Metrics.globalRegistry.config().meterFilter(MeterFilter.denyNameStartsWith("spring.batch"))
```
有关更多详情,请参阅千分尺的[参考文献](http://micrometer.io/docs/concepts#_meter_filters)
\ No newline at end of file
# 项目处理
## 项处理
XMLJavaBoth
[ItemReader 和 ItemWriter 接口](readersAndWriters.html#readersAndWriters)对于它们的特定任务都非常有用,但是如果你想在编写之前插入业务逻辑呢?读和写的一个选项是使用复合模式:创建一个`ItemWriter`,其中包含另一个`ItemWriter`或一个`ItemReader`,其中包含另一个`ItemReader`。下面的代码展示了一个示例:
```
public class CompositeItemWriter<T> implements ItemWriter<T> {
ItemWriter<T> itemWriter;
public CompositeItemWriter(ItemWriter<T> itemWriter) {
this.itemWriter = itemWriter;
}
public void write(List<? extends T> items) throws Exception {
//Add business logic here
itemWriter.write(items);
}
public void setDelegate(ItemWriter<T> itemWriter){
this.itemWriter = itemWriter;
}
}
```
前面的类包含另一个`ItemWriter`,它在提供了一些业务逻辑之后将其委托给它。这种模式也可以很容易地用于`ItemReader`,也许可以基于由主`ItemReader`提供的输入来获得更多的引用数据。如果你需要自己控制对`write`的调用,它也很有用。但是,如果你只想在实际写入之前“转换”传入的用于写入的项,则不需要`write`你自己。你只需修改项目即可。对于此场景, Spring Batch 提供了`ItemProcessor`接口,如下面的接口定义所示:
```
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
```
`ItemProcessor`很简单。给定一个对象,将其转换并返回另一个对象。所提供的对象可以是相同类型的,也可以不是相同类型的。关键在于,业务逻辑可以应用于流程中,完全由开发人员来创建该逻辑。`ItemProcessor`可以直接连接到一个步骤。例如,假设`ItemReader`提供了类型`Foo`的类,并且在写出之前需要将其转换为类型`Bar`。下面的示例显示了执行转换的`ItemProcessor`:
```
public class Foo {}
public class Bar {
public Bar(Foo foo) {}
}
public class FooProcessor implements ItemProcessor<Foo, Bar> {
public Bar process(Foo foo) throws Exception {
//Perform simple transformation, convert a Foo to a Bar
return new Bar(foo);
}
}
public class BarWriter implements ItemWriter<Bar> {
public void write(List<? extends Bar> bars) throws Exception {
//write bars
}
}
```
在前面的示例中,有一个类`Foo`,一个类`Bar`,以及一个类`FooProcessor`,它坚持`ItemProcessor`接口。转换很简单,但是任何类型的转换都可以在这里完成。`BarWriter``Bar`对象,如果提供了任何其他类型,则抛出异常。类似地,如果只提供了`Foo`,则`FooProcessor`抛出异常。然后可以将`FooProcessor`注入`Step`,如下例所示:
XML 配置
```
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="fooProcessor" writer="barWriter"
commit-interval="2"/>
</tasklet>
</step>
</job>
```
Java 配置
```
@Bean
public Job ioSampleJob() {
return this.jobBuilderFactory.get("ioSampleJob")
.start(step1())
.build();
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<Foo, Bar>chunk(2)
.reader(fooReader())
.processor(fooProcessor())
.writer(barWriter())
.build();
}
```
`ItemProcessor``ItemReader``ItemWriter`之间的区别在于,`ItemProcessor`对于`Step`是可选的。
### 链接项目处理器
在许多场景中,执行单个转换是有用的,但是如果你想将多个`ItemProcessor`实现“链”在一起,该怎么办?这可以使用前面提到的复合模式来完成。为了更新前面的单个转换,例如,将`Foo`转换为`Bar`,将其转换为`Foobar`并写出,如以下示例所示:
```
public class Foo {}
public class Bar {
public Bar(Foo foo) {}
}
public class Foobar {
public Foobar(Bar bar) {}
}
public class FooProcessor implements ItemProcessor<Foo, Bar> {
public Bar process(Foo foo) throws Exception {
//Perform simple transformation, convert a Foo to a Bar
return new Bar(foo);
}
}
public class BarProcessor implements ItemProcessor<Bar, Foobar> {
public Foobar process(Bar bar) throws Exception {
return new Foobar(bar);
}
}
public class FoobarWriter implements ItemWriter<Foobar>{
public void write(List<? extends Foobar> items) throws Exception {
//write items
}
}
```
a`FooProcessor`和 a`BarProcessor`可以’链接’在一起,以得到结果`Foobar`,如以下示例所示:
```
CompositeItemProcessor<Foo,Foobar> compositeProcessor =
new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooProcessor());
itemProcessors.add(new BarProcessor());
compositeProcessor.setDelegates(itemProcessors);
```
正如前面的示例一样,复合处理器可以配置为`Step`:
XML 配置
```
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="compositeItemProcessor" writer="foobarWriter"
commit-interval="2"/>
</tasklet>
</step>
</job>
<bean id="compositeItemProcessor"
class="org.springframework.batch.item.support.CompositeItemProcessor">
<property name="delegates">
<list>
<bean class="..FooProcessor" />
<bean class="..BarProcessor" />
</list>
</property>
</bean>
```
Java 配置
```
@Bean
public Job ioSampleJob() {
return this.jobBuilderFactory.get("ioSampleJob")
.start(step1())
.build();
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<Foo, Foobar>chunk(2)
.reader(fooReader())
.processor(compositeProcessor())
.writer(foobarWriter())
.build();
}
@Bean
public CompositeItemProcessor compositeProcessor() {
List<ItemProcessor> delegates = new ArrayList<>(2);
delegates.add(new FooProcessor());
delegates.add(new BarProcessor());
CompositeItemProcessor processor = new CompositeItemProcessor();
processor.setDelegates(delegates);
return processor;
}
```
### 过滤记录
项目处理器的一个典型用途是在将记录传递给`ItemWriter`之前过滤掉它们。过滤是一种不同于跳过的动作。跳过表示记录无效,而筛选只表示不应写入记录。
例如,考虑一个批处理作业,它读取包含三种不同类型记录的文件:要插入的记录、要更新的记录和要删除的记录。如果系统不支持记录删除,那么我们将不希望将任何“delete”记录发送到`ItemWriter`。但是,由于这些记录实际上并不是不良记录,我们希望过滤掉它们,而不是跳过它们。因此,`ItemWriter`将只接收“插入”和“更新”记录。
要过滤记录,可以从`ItemProcessor`返回`null`。该框架检测到结果是`null`,并避免将该项添加到交付给`ItemWriter`的记录列表中。像往常一样,从`ItemProcessor`抛出的异常会导致跳过。
### 验证输入
[项目阅读器和项目编写器](readersAndWriters.html#readersAndWriters)章中,讨论了多种解析输入的方法。如果不是“格式良好”的,每个主要实现都会抛出一个异常。如果缺少数据范围,`FixedLengthTokenizer`将抛出一个异常。类似地,试图访问`RowMapper``FieldSetMapper`中不存在或格式与预期不同的索引,会引发异常。所有这些类型的异常都是在`read`返回之前抛出的。但是,它们没有解决返回的项目是否有效的问题。例如,如果其中一个字段是年龄,那么它显然不可能是负的。它可以正确地解析,因为它存在并且是一个数字,但是它不会导致异常。由于已经有过多的验证框架, Spring Batch 不会尝试提供另一种验证框架。相反,它提供了一个名为`Validator`的简单接口,可以由任意数量的框架实现,如以下接口定义所示:
```
public interface Validator<T> {
void validate(T value) throws ValidationException;
}
```
契约是,如果对象无效,`validate`方法抛出一个异常,如果对象有效,则正常返回。 Spring 批处理提供了开箱即用的`ValidatingItemProcessor`,如以下 Bean 定义所示:
XML 配置
```
<bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">
<property name="validator" ref="validator" />
</bean>
<bean id="validator" class="org.springframework.batch.item.validator.SpringValidator">
<property name="validator">
<bean class="org.springframework.batch.sample.domain.trade.internal.validator.TradeValidator"/>
</property>
</bean>
```
Java 配置
```
@Bean
public ValidatingItemProcessor itemProcessor() {
ValidatingItemProcessor processor = new ValidatingItemProcessor();
processor.setValidator(validator());
return processor;
}
@Bean
public SpringValidator validator() {
SpringValidator validator = new SpringValidator();
validator.setValidator(new TradeValidator());
return validator;
}
```
你还可以使用`BeanValidatingItemProcessor`来验证用 Bean 验证 API(JSR-303)注释的项。例如,给定以下类型`Person`:
```
class Person {
@NotEmpty
private String name;
public Person(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
```
可以通过在应用程序上下文中声明`BeanValidatingItemProcessor` Bean 来验证项,并在面向块的步骤中将其注册为处理器:
```
@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
beanValidatingItemProcessor.setFilter(true);
return beanValidatingItemProcessor;
}
```
### 容错
当块被回滚时,在读取过程中缓存的项可能会被重新处理。如果一个步骤被配置为容错(通常通过使用跳过或重试处理),则所使用的任何`ItemProcessor`都应该以幂等的方式实现。通常,这将包括对`ItemProcessor`的输入项不执行任何更改,并且只更新结果中的实例。
\ No newline at end of file
此差异已折叠。
# 重复
## 重复
XMLJavaBoth
### repeatemplate
批处理是关于重复的操作,或者作为简单的优化,或者作为工作的一部分。 Spring Batch 具有`RepeatOperations`接口,可以对重复进行策略规划和推广,并提供相当于迭代器框架的内容。`RepeatOperations`接口具有以下定义:
```
public interface RepeatOperations {
RepeatStatus iterate(RepeatCallback callback) throws RepeatException;
}
```
回调是一个接口,如以下定义所示,它允许你插入一些要重复的业务逻辑:
```
public interface RepeatCallback {
RepeatStatus doInIteration(RepeatContext context) throws Exception;
}
```
回调会重复执行,直到实现确定迭代应该结束为止。这些接口中的返回值是一个枚举,可以是`RepeatStatus.CONTINUABLE``RepeatStatus.FINISHED`。一个`RepeatStatus`枚举向重复操作的调用者传递有关是否还有更多工作要做的信息。一般来说,`RepeatOperations`的实现应该检查`RepeatStatus`,并将其用作结束迭代的决策的一部分。任何希望向调用者发出信号表示没有更多工作要做的回调都可以返回`RepeatStatus.FINISHED`
`RepeatOperations`最简单的通用实现是`RepeatTemplate`,如下例所示:
```
RepeatTemplate template = new RepeatTemplate();
template.setCompletionPolicy(new SimpleCompletionPolicy(2));
template.iterate(new RepeatCallback() {
public RepeatStatus doInIteration(RepeatContext context) {
// Do stuff in batch...
return RepeatStatus.CONTINUABLE;
}
});
```
在前面的示例中,我们返回`RepeatStatus.CONTINUABLE`,以表明还有更多的工作要做。回调还可以返回`RepeatStatus.FINISHED`,向调用者发出信号,表示没有更多的工作要做。一些迭代可以由回调中所做的工作固有的考虑因素来终止。就回调而言,其他方法实际上是无限循环,并且完成决策被委托给外部策略,如前面示例中所示的情况。
#### repeatcontext
`RepeatCallback`的方法参数是`RepeatContext`。许多回调忽略了上下文。但是,如果有必要,它可以作为一个属性包来存储迭代期间的瞬态数据。在`iterate`方法返回后,上下文不再存在。
如果正在进行嵌套的迭代,则`RepeatContext`具有父上下文。父上下文有时用于存储需要在对`iterate`的调用之间共享的数据。例如,如果你想计算迭代中某个事件发生的次数,并在随后的调用中记住它,那么就是这种情况。
#### 重复状态
`RepeatStatus`是 Spring 批处理用来指示处理是否已经完成的枚举。它有两个可能的`RepeatStatus`值,如下表所示:
| *Value* |*说明*|
|-----------|--------------------------------------|
|CONTINUABLE|还有更多的工作要做。|
| FINISHED |不应再重复。|
`RepeatStatus`值也可以通过在`RepeatStatus`中使用`and()`方法与逻辑和操作结合。这样做的效果是在可持续的标志上做一个合乎逻辑的操作。换句话说,如果任一状态是`FINISHED`,则结果是`FINISHED`
### 完工政策
`RepeatTemplate`内,`iterate`方法中的循环的终止由`CompletionPolicy`确定,这也是`RepeatContext`的工厂。`RepeatTemplate`负责使用当前策略创建`RepeatContext`,并在迭代的每个阶段将其传递给`RepeatCallback`。回调完成其`doInIteration`后,`RepeatTemplate`必须调用`CompletionPolicy`,以要求它更新其状态(该状态将存储在`RepeatContext`中)。然后,它询问策略迭代是否完成。
Spring 批处理提供了`CompletionPolicy`的一些简单的通用实现。`SimpleCompletionPolicy`允许执行多达固定的次数(与`RepeatStatus.FINISHED`一起强制在任何时间提前完成)。
对于更复杂的决策,用户可能需要实现自己的完成策略。例如,一旦联机系统投入使用,一个批处理窗口就会阻止批处理作业的执行,这将需要一个自定义策略。
### 异常处理
如果在`RepeatCallback`中抛出了异常,则`RepeatTemplate`查询`ExceptionHandler`,该查询可以决定是否重新抛出异常。
下面的清单显示了`ExceptionHandler`接口定义:
```
public interface ExceptionHandler {
void handleException(RepeatContext context, Throwable throwable)
throws Throwable;
}
```
一个常见的用例是计算给定类型的异常数量,并在达到限制时失败。为此目的, Spring 批提供了`SimpleLimitExceptionHandler`和稍微更灵活的`RethrowOnThresholdExceptionHandler``SimpleLimitExceptionHandler`具有一个极限属性和一个异常类型,应该将其与当前异常进行比较。所提供类型的所有子类也被计算在内。给定类型的异常将被忽略,直到达到限制,然后重新抛出它们。其他类型的异常总是被重新抛出。
`SimpleLimitExceptionHandler`的一个重要的可选属性是名为`useParent`的布尔标志。默认情况下它是`false`,因此该限制仅在当前的`RepeatContext`中考虑。当设置为`true`时,该限制在嵌套迭代中跨兄弟上下文(例如步骤中的一组块)保持不变。
### 听众
通常情况下,能够接收跨多个不同迭代的交叉关注点的额外回调是有用的。为此, Spring Batch 提供了`RepeatListener`接口。`RepeatTemplate`允许用户注册`RepeatListener`实现,并且在迭代期间可用的情况下,他们将获得带有`RepeatContext``RepeatStatus`的回调。
`RepeatListener`接口具有以下定义:
```
public interface RepeatListener {
void before(RepeatContext context);
void after(RepeatContext context, RepeatStatus result);
void open(RepeatContext context);
void onError(RepeatContext context, Throwable e);
void close(RepeatContext context);
}
```
`open``close`回调出现在整个迭代之前和之后。`before``after`,和`onError`应用于单独的`RepeatCallback`调用。
请注意,当有多个侦听器时,它们在一个列表中,因此有一个顺序。在这种情况下,`open``before`的调用顺序相同,而`after``onError``close`的调用顺序相反。
### 并行处理
`RepeatOperations`的实现不限于按顺序执行回调。一些实现能够并行地执行它们的回调,这一点非常重要。为此, Spring Batch 提供了`TaskExecutorRepeatTemplate`,它使用 Spring `TaskExecutor`策略来运行`RepeatCallback`。默认值是使用`SynchronousTaskExecutor`,其效果是在相同的线程中执行整个迭代(与正常的`RepeatTemplate`相同)。
### 声明式迭代
有时,你知道有一些业务处理在每次发生时都想要重复。这方面的经典示例是消息管道的优化。如果一批消息经常到达,那么处理它们比为每条消息承担单独事务的成本更有效。 Spring Batch 提供了一个 AOP 拦截器,该拦截器仅为此目的将方法调用包装在`RepeatOperations`对象中。将`RepeatOperationsInterceptor`执行所截获的方法并根据所提供的`CompletionPolicy`中的`RepeatTemplate`进行重复。
下面的示例展示了使用 Spring AOP 命名空间来重复对名为`processMessage`的方法的服务调用的声明性迭代(有关如何配置 AOP 拦截器的更多详细信息,请参见 Spring 用户指南):
```
<aop:config>
<aop:pointcut id="transactional"
expression="execution(* com..*Service.processMessage(..))" />
<aop:advisor pointcut-ref="transactional"
advice-ref="retryAdvice" order="-1"/>
</aop:config>
<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>
```
下面的示例演示了如何使用 Java 配置来重复对一个名为`processMessage`的方法的服务调用(有关如何配置 AOP 拦截器的更多详细信息,请参见 Spring 用户指南):
```
@Bean
public MyService myService() {
ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
factory.setInterfaces(MyService.class);
factory.setTarget(new MyService());
MyService service = (MyService) factory.getProxy();
JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
pointcut.setPatterns(".*processMessage.*");
RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();
((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));
return service;
}
```
前面的示例在拦截器内部使用默认的`RepeatTemplate`。要更改策略、侦听器和其他详细信息,可以将`RepeatTemplate`的实例注入拦截器。
如果截获的方法返回`void`,那么拦截器总是返回`RepeatStatus.CONTINUABLE`(因此,如果`CompletionPolicy`没有有限的端点,则存在无限循环的危险)。否则,它将返回`RepeatStatus.CONTINUABLE`,直到截获的方法的返回值是`null`,此时它将返回`RepeatStatus.FINISHED`。因此,目标方法中的业务逻辑可以通过返回`null`或抛出一个异常来表示没有更多的工作要做,该异常是由提供的`ExceptionHandler`中的`RepeatTemplate`重新抛出的。
# 重试
## 重试
XMLJavaBoth
为了使处理更健壮,更不容易失败,有时自动重试失败的操作会有所帮助,以防随后的尝试可能会成功。容易发生间歇性故障的错误通常是暂时的。例如,对 Web 服务的远程调用由于网络故障或数据库更新中的`DeadlockLoserDataAccessException`而失败。
### `RetryTemplate`
| |重试功能在 2.2.0 时从 Spring 批中退出。<br/>它现在是一个新库[Spring Retry](https://github.com/spring-projects/spring-retry)的一部分。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
要自动化重试操作 Spring,批处理有`RetryOperations`策略。以下是`RetryOperations`的接口定义:
```
public interface RetryOperations {
<T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback) throws E;
<T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback)
throws E;
<T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RetryState retryState)
throws E, ExhaustedRetryException;
<T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback,
RetryState retryState) throws E;
}
```
Basic Callback 是一个简单的接口,允许你插入一些要重试的业务逻辑,如下面的接口定义所示:
```
public interface RetryCallback<T, E extends Throwable> {
T doWithRetry(RetryContext context) throws E;
}
```
回调会运行,如果它失败(通过抛出`Exception`),则会重试它,直到它成功或实现中止为止。在`RetryOperations`接口中有许多重载的`execute`方法。当所有的重试尝试都用完时,这些方法处理用于恢复的各种用例,并处理重试状态,这使客户机和实现在调用之间存储信息(我们将在本章后面详细介绍这一点)。
`RetryOperations`的最简单的通用实现是`RetryTemplate`。其用途如下:
```
RetryTemplate template = new RetryTemplate();
TimeoutRetryPolicy policy = new TimeoutRetryPolicy();
policy.setTimeout(30000L);
template.setRetryPolicy(policy);
Foo result = template.execute(new RetryCallback<Foo>() {
public Foo doWithRetry(RetryContext context) {
// Do stuff that might fail, e.g. webservice operation
return result;
}
});
```
在前面的示例中,我们进行一个 Web 服务调用,并将结果返回给用户。如果该调用失败,则重试该调用,直到达到超时为止。
#### `RetryContext`
`RetryCallback`的方法参数是`RetryContext`。许多回调忽略了上下文,但如果有必要,它可以作为一个属性包来存储迭代期间的数据。
如果同一个线程中有一个正在进行的嵌套重试,则`RetryContext`具有父上下文。父上下文有时用于存储需要在对`execute`的调用之间共享的数据。
#### `RecoveryCallback`
当重试用完时,`RetryOperations`可以将控制权传递给另一个回调,称为`RecoveryCallback`。要使用此功能,客户机将回调一起传递给相同的方法,如以下示例所示:
```
Foo foo = template.execute(new RetryCallback<Foo>() {
public Foo doWithRetry(RetryContext context) {
// business logic here
},
new RecoveryCallback<Foo>() {
Foo recover(RetryContext context) throws Exception {
// recover logic here
}
});
```
如果业务逻辑在模板决定中止之前没有成功,那么客户机将有机会通过恢复回调执行一些替代处理。
#### 无状态重试
在最简单的情况下,重试只是一个 while 循环。`RetryTemplate`可以一直尝试,直到成功或失败为止。`RetryContext`包含一些状态来决定是重试还是中止,但是这个状态在堆栈上,不需要在全局的任何地方存储它,所以我们将其称为无状态重试。无状态重试和有状态重试之间的区别包含在`RetryPolicy`的实现中(`RetryTemplate`可以同时处理这两个)。在无状态的重试中,重试回调总是在它失败时所在的线程中执行。
#### 有状态重试
在故障导致事务资源无效的情况下,有一些特殊的考虑因素。这不适用于简单的远程调用,因为(通常)没有事务性资源,但有时确实适用于数据库更新,尤其是在使用 Hibernate 时。在这种情况下,只有立即重新抛出调用故障的异常才有意义,这样事务就可以回滚,并且我们可以启动一个新的有效事务。
在涉及事务的情况下,无状态重试还不够好,因为重新抛出和回滚必然涉及离开`RetryOperations.execute()`方法,并且可能会丢失堆栈上的上下文。为了避免丢失它,我们必须引入一种存储策略,将其从堆栈中取出,并将其(至少)放在堆存储中。为此, Spring 批提供了一种名为`RetryContextCache`的存储策略,它可以被注入到`RetryTemplate`中。`RetryContextCache`的默认实现是在内存中,使用一个简单的`Map`。在集群环境中使用多个进程的高级用法还可以考虑使用某种类型的集群缓存来实现`RetryContextCache`(但是,即使在集群环境中,这也可能是过度使用)。
`RetryOperations`的部分职责是识别在新执行中(并且通常包装在新事务中)返回的失败操作。为了促进这一点, Spring Batch 提供了`RetryState`抽象。这与`RetryOperations`接口中的特殊`execute`方法一起工作。
识别失败操作的方法是在重试的多个调用之间识别状态。为了识别状态,用户可以提供一个`RetryState`对象,该对象负责返回标识该项的唯一密钥。标识符在`RetryContextCache`接口中用作键。
| |在<br/>中实现`Object.equals()``Object.hashCode()`时要非常小心。最好的建议是使用业务键来标识<br/>项。在 JMS 消息的情况下,可以使用消息 ID。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
当重试用完时,还可以选择以不同的方式处理失败的项,而不是调用`RetryCallback`(现在认为很可能会失败)。就像在无状态的情况下一样,这个选项是由`RecoveryCallback`提供的,可以通过将其传递到`execute``RetryOperations`方法来提供。
是否重试的决定实际上被委托给一个常规的`RetryPolicy`,因此通常对限制和超时的关注可以被注入到那里(在本章后面描述)。
### 重试策略
`RetryTemplate`中,在`execute`方法中重试或失败的决定由`RetryPolicy`决定,这也是`RetryContext`的工厂。`RetryTemplate`负责使用当前策略创建`RetryContext`,并在每次尝试时将其传递给`RetryCallback`。回调失败后,`RetryTemplate`必须调用`RetryPolicy`,要求它更新其状态(存储在`RetryContext`中),然后询问策略是否可以进行另一次尝试。如果无法进行另一次尝试(例如,当达到限制或检测到超时时时),则策略还负责处理耗尽状态。简单的实现方式会抛出`RetryExhaustedException`,这会导致任何封闭事务被回滚。更复杂的实现可能会尝试采取一些恢复操作,在这种情况下,事务可以保持不变。
| |失败本质上要么是可重复的,要么是不可重复的。如果从业务逻辑中总是抛出相同的异常<br/>,则重试没有好处。因此,不要重试所有<br/>异常类型。相反,尝试只关注那些你期望<br/>可重新尝试的异常。更积极地重试通常不会对业务逻辑造成损害,但是<br/>这是浪费的,因为如果失败是确定性的,那么你将花费时间重试一些你事先知道是致命的<br/>。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
Spring 批处理提供了无状态`RetryPolicy`的一些简单的通用实现,例如`SimpleRetryPolicy``TimeoutRetryPolicy`(在前面的示例中使用)。
`SimpleRetryPolicy`允许对任何已命名的异常类型列表进行重试,重试次数最多为固定次数。它还具有一个“致命”异常列表,这些异常永远不应该被重试,并且这个列表覆盖了可重试列表,以便可以使用它对重试行为进行更好的控制,如下面的示例所示:
```
SimpleRetryPolicy policy = new SimpleRetryPolicy();
// Set the max retry attempts
policy.setMaxAttempts(5);
// Retry on all exceptions (this is the default)
policy.setRetryableExceptions(new Class[] {Exception.class});
// ... but never retry IllegalStateException
policy.setFatalExceptions(new Class[] {IllegalStateException.class});
// Use the policy...
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(policy);
template.execute(new RetryCallback<Foo>() {
public Foo doWithRetry(RetryContext context) {
// business logic here
}
});
```
还有一个更灵活的实现叫做`ExceptionClassifierRetryPolicy`,它允许用户通过`ExceptionClassifier`抽象为任意一组异常类型配置不同的重试行为。该策略的工作原理是调用分类器将异常转换为委托`RetryPolicy`。例如,通过将一种异常类型映射到另一种策略,可以在失败前重试更多次。
用户可能需要实现他们自己的重试策略,以做出更多定制的决策。例如,当存在已知的、特定于解决方案的异常的可重试和不可重试的分类时,自定义重试策略是有意义的。
### 退避政策
当在短暂的失败之后重试时,在再次尝试之前等待一下通常会有所帮助,因为通常故障是由某些只能通过等待解决的问题引起的。如果`RetryCallback`失败,`RetryTemplate`可以根据`BackoffPolicy`暂停执行。
下面的代码显示了`BackOffPolicy`接口的接口定义:
```
public interface BackoffPolicy {
BackOffContext start(RetryContext context);
void backOff(BackOffContext backOffContext)
throws BackOffInterruptedException;
}
```
a`BackoffPolicy`可以自由地以它选择的任何方式实现退避。 Spring Batch Out of the Box 提供的策略都使用。一个常见的用例是后退,等待时间呈指数增长,以避免两次重试进入锁定步骤,两次都失败(这是从以太网学到的经验教训)。为此, Spring batch 提供了`ExponentialBackoffPolicy`
### 听众
通常情况下,能够接收跨多个不同重试中的交叉关注点的额外回调是有用的。为此, Spring Batch 提供了`RetryListener`接口。`RetryTemplate`允许用户注册`RetryListeners`,并且在迭代期间可用的情况下,给出带有`RetryContext``Throwable`的回调。
下面的代码显示了`RetryListener`的接口定义:
```
public interface RetryListener {
<T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback);
<T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable);
<T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable);
}
```
在最简单的情况下,`open``close`回调出现在整个重试之前和之后,并且`onError`应用于单个`RetryCallback`调用。`close`方法也可能接收`Throwable`。如果出现错误,则是`RetryCallback`抛出的最后一个错误。
请注意,当有多个侦听器时,它们在一个列表中,因此有一个顺序。在这种情况下,以相同的顺序调用`open`,而以相反的顺序调用`onError``close`
### 声明式重试
有时,你知道有些业务处理在每次发生时都想要重试。这方面的典型例子是远程服务调用。 Spring Batch 提供了 AOP 拦截器,该拦截器仅为此目的在`RetryOperations`实现中包装方法调用。根据提供的`RepeatTemplate`中的`RetryPolicy``RetryOperationsInterceptor`执行截获的方法并在失败时重试。
下面的示例显示了一个声明性重试,它使用 Spring AOP 命名空间重试对一个名为`remoteCall`的方法的服务调用(有关如何配置 AOP 拦截器的更多详细信息,请参见 Spring 用户指南):
```
<aop:config>
<aop:pointcut id="transactional"
expression="execution(* com..*Service.remoteCall(..))" />
<aop:advisor pointcut-ref="transactional"
advice-ref="retryAdvice" order="-1"/>
</aop:config>
<bean id="retryAdvice"
class="org.springframework.retry.interceptor.RetryOperationsInterceptor"/>
```
下面的示例显示了一个声明性重试,它使用 Java 配置重试对一个名为`remoteCall`的方法的服务调用(有关如何配置 AOP 拦截器的更多详细信息,请参见 Spring 用户指南):
```
@Bean
public MyService myService() {
ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
factory.setInterfaces(MyService.class);
factory.setTarget(new MyService());
MyService service = (MyService) factory.getProxy();
JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
pointcut.setPatterns(".*remoteCall.*");
RetryOperationsInterceptor interceptor = new RetryOperationsInterceptor();
((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));
return service;
}
```
前面的示例在拦截器内部使用默认的`RetryTemplate`。要更改策略或侦听器,可以将`RetryTemplate`的实例注入拦截器。
\ No newline at end of file
# 缩放和并行处理
## 缩放和并行处理
XMLJavaBoth
许多批处理问题可以通过单线程、单流程作业来解决,因此在考虑更复杂的实现之前,正确地检查它是否满足你的需求始终是一个好主意。衡量一项实际工作的性能,看看最简单的实现是否首先满足你的需求。即使使用标准的硬件,你也可以在一分钟内读写几百兆的文件。
Spring 当你准备好开始用一些并行处理来实现一个作业时, Spring Batch 提供了一系列选项,这些选项在本章中进行了描述,尽管其他地方也介绍了一些特性。在高层次上,有两种并行处理模式:
* 单过程、多线程
* 多进程
这些指标也可分为以下几类:
* 多线程步骤(单进程)
* 并行步骤(单一过程)
* 步骤的远程分块(多进程)
* 划分一个步骤(单个或多个进程)
首先,我们回顾一下单流程选项。然后,我们回顾了多进程的选择。
### 多线程步骤
启动并行处理的最简单方法是在步骤配置中添加`TaskExecutor`
例如,你可以添加`tasklet`的一个属性,如下所示:
```
<step id="loading">
<tasklet task-executor="taskExecutor">...</tasklet>
</step>
```
当使用 Java 配置时,可以将`TaskExecutor`添加到该步骤中,如以下示例所示:
Java 配置
```
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}
```
在此示例中,`taskExecutor`是对另一个 Bean 定义的引用,该定义实现了`TaskExecutor`接口。[`TaskExecutor`](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/springframework/core/core/task/taskexecutor.html)是一个标准的 Spring 接口,因此请参阅 Spring 用户指南以获得可用实现的详细信息。最简单的多线程`TaskExecutor``SimpleAsyncTaskExecutor`
上述配置的结果是,`Step`通过在单独的执行线程中读取、处理和写入每个项块(每个提交间隔)来执行。请注意,这意味着要处理的项没有固定的顺序,并且块可能包含与单线程情况相比非连续的项。除了任务执行器设置的任何限制(例如它是否由线程池支持)之外,Tasklet 配置中还有一个油门限制,默认为 4。你可能需要增加这一点,以确保线程池得到充分利用。
例如,你可能会增加油门限制,如以下示例所示:
```
<step id="loading"> <tasklet
task-executor="taskExecutor"
throttle-limit="20">...</tasklet>
</step>
```
在使用 Java 配置时,构建器提供对油门限制的访问,如以下示例所示:
Java 配置
```
@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.throttleLimit(20)
.build();
}
```
还请注意,在你的步骤中使用的任何池资源都可能对并发性施加限制,例如`DataSource`。确保这些资源中的池至少与步骤中所需的并发线程数量一样大。
对于一些常见的批处理用例,使用多线程`Step`实现有一些实际的限制。`Step`中的许多参与者(例如读者和作者)是有状态的。如果状态不是由线程隔离的,那么这些组件在多线程`Step`中是不可用的。特别是, Spring 批中的大多数现成的读取器和编写器都不是为多线程使用而设计的。然而,可以使用无状态的或线程安全的读取器和编写器,并且在[Spring Batch Samples](https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples)中有一个示例(称为`parallelJob`),该示例显示了使用过程指示器(参见[防止状态持久性](readersAndWriters.html#process-indicator))来跟踪在数据库输入表中已处理的项。
Spring 批处理提供了`ItemWriter``ItemReader`的一些实现方式。通常,他们会在 Javadoc 中说明它们是否是线程安全的,或者你必须做什么来避免在并发环境中出现问题。如果 Javadoc 中没有信息,则可以检查实现,以查看是否存在任何状态。如果阅读器不是线程安全的,那么你可以使用提供的`SynchronizedItemStreamReader`来装饰它,或者在你自己的同步委托程序中使用它。你可以将调用同步到`read()`,并且只要处理和写入是块中最昂贵的部分,你的步骤仍然可以比在单线程配置中快得多地完成。
### 平行步骤
只要需要并行化的应用程序逻辑可以划分为不同的职责,并分配给各个步骤,那么就可以在单个流程中进行并行化。并行步骤执行很容易配置和使用。
例如,与`step3`并行执行`(step1,step2)`的步骤是直接的,如以下示例所示:
```
<job id="job1">
<split id="split1" task-executor="taskExecutor" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
<flow>
<step id="step3" parent="s3"/>
</flow>
</split>
<step id="step4" parent="s4"/>
</job>
<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>
```
当使用 Java 配置时,与`(step1,step2)`并行执行步骤`step3`是很简单的,如以下示例所示:
Java 配置
```
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
}
@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
```
可配置任务执行器用于指定应该使用哪个`TaskExecutor`实现来执行各个流。默认值是`SyncTaskExecutor`,但是需要一个异步`TaskExecutor`来并行运行这些步骤。请注意,该作业确保在聚合退出状态和转换之前,拆分中的每个流都已完成。
有关更多详细信息,请参见[拆分流](step.html#split-flows)一节。
### 远程分块
在远程分块中,`Step`处理被分割到多个进程中,通过一些中间件相互通信。下图显示了该模式:
![远程分块](https://docs.spring.io/spring-batch/docs/current/reference/html/images/remote-chunking.png)
图 1。远程分块
Manager 组件是一个单独的进程,工作人员是多个远程进程。如果 Manager 不是瓶颈,那么这种模式最有效,因此处理必须比读取项目更昂贵(在实践中通常是这种情况)。
Manager 是 Spring 批处理`Step`的实现,其中`ItemWriter`被一个通用版本代替,该版本知道如何将项目块作为消息发送到中间件。工人是正在使用的任何中间件的标准侦听器(例如,对于 JMS,他们将是`MessageListener`实现),他们的角色是通过`ItemWriter``ItemProcessor`加上`ItemWriter`接口使用标准的项块。使用这种模式的优点之一是读写器、处理器和写写器组件是现成的(与用于步骤的本地执行的组件相同)。这些项是动态划分的,工作是通过中间件共享的,因此,如果侦听器都是热心的消费者,那么负载平衡就是自动的。
中间件必须是持久的,保证交付,并且每条消息只有一个使用者。JMS 是显而易见的候选者,但在网格计算和共享内存产品空间中存在其他选项(例如 JavaSpace)。
有关更多详细信息,请参见[Spring Batch Integration - Remote Chunking](spring-batch-integration.html#remote-chunking)一节。
### 分区
Spring 批处理还提供了用于分区`Step`执行并远程执行它的 SPI。在这种情况下,远程参与者是`Step`实例,这些实例可以很容易地被配置并用于本地处理。下图显示了该模式:
![分区概述](https://docs.spring.io/spring-batch/docs/current/reference/html/images/partitioning-overview.png)
图 2。划分
`Job`作为`Step`实例的序列在左侧运行,其中一个`Step`实例被标记为管理器。这张图中的工人都是`Step`的相同实例,它实际上可以代替经理,从而导致`Job`的结果相同。工作人员通常是远程服务,但也可能是执行的本地线程。在此模式中,经理发送给工作人员的消息不需要是持久的,也不需要有保证的交付。 Spring `JobRepository`中的批处理元数据确保每个工作者执行一次,并且对于每个`Job`执行只执行一次。
Spring 批处理中的 SPI 由`Step`(称为`PartitionStep`)的特殊实现和需要为特定环境实现的两个策略接口组成。策略接口是`PartitionHandler``StepExecutionSplitter`,它们的作用在下面的序列图中显示:
![分区 SPI](https://docs.spring.io/spring-batch/docs/current/reference/html/images/partitioning-spi.png)
图 3。分区 SPI
在这种情况下,右边的`Step`是“远程”工作者,因此,潜在地,有许多对象和或进程在扮演这个角色,并且`PartitionStep`被显示为驱动执行。
下面的示例显示了使用 XML 配置时的`PartitionStep`配置:
```
<step id="step1.manager">
<partition step="step1" partitioner="partitioner">
<handler grid-size="10" task-executor="taskExecutor"/>
</partition>
</step>
```
下面的示例显示了使用 Java 配置时的`PartitionStep`配置:
Java 配置
```
@Bean
public Step step1Manager() {
return stepBuilderFactory.get("step1.manager")
.<String, String>partitioner("step1", partitioner())
.step(step1())
.gridSize(10)
.taskExecutor(taskExecutor())
.build();
}
```
与多线程步骤的`throttle-limit`属性类似,`grid-size`属性防止任务执行器被来自单个步骤的请求饱和。
有一个简单的示例,可以在[Spring Batch Samples](https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples/src/main/resources/jobs)的单元测试套件中进行复制和扩展(参见`partition*Job.xml`配置)。
Spring 批处理为被称为“Step1:Partition0”的分区创建步骤执行,以此类推。为了保持一致性,许多人更喜欢将 Manager 步骤称为“Step1:Manager”。你可以为步骤使用别名(通过指定`name`属性而不是`id`属性)。
#### 分区处理程序
`PartitionHandler`是了解远程或网格环境结构的组件。它能够将`StepExecution`请求发送到远程`Step`实例,并以某种特定于织物的格式包装,例如 DTO。它不需要知道如何分割输入数据或如何聚合多个`Step`执行的结果。一般来说,它可能也不需要了解弹性或故障转移,因为在许多情况下,这些都是织物的功能。在任何情况下, Spring 批处理总是提供独立于织物的重启性。失败的`Job`总是可以重新启动,并且只重新执行失败的`Steps`
`PartitionHandler`接口可以为各种结构类型提供专门的实现,包括简单的 RMI 远程处理、EJB 远程处理、自定义 Web 服务、JMS、Java 空间、共享内存网格(如 Terracotta 或 Coherence)和网格执行结构(如 GridGain)。 Spring 批处理不包含用于任何专有网格或远程织物的实现方式。
Spring 然而,批处理确实提供了`PartitionHandler`的一种有用的实现,该实现使用 Spring 中的`TaskExecutor`策略,在单独的执行线程中本地执行`Step`实例。该实现被称为`TaskExecutorPartitionHandler`
`TaskExecutorPartitionHandler`是使用前面显示的 XML 名称空间进行配置的步骤的默认值。也可以显式地对其进行配置,如以下示例所示:
```
<step id="step1.manager">
<partition step="step1" handler="handler"/>
</step>
<bean class="org.spr...TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="taskExecutor"/>
<property name="step" ref="step1" />
<property name="gridSize" value="10" />
</bean>
```
`TaskExecutorPartitionHandler`可以在 Java 配置中显式地进行配置,如以下示例所示:
Java 配置
```
@Bean
public Step step1Manager() {
return stepBuilderFactory.get("step1.manager")
.partitioner("step1", partitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor());
retVal.setStep(step1());
retVal.setGridSize(10);
return retVal;
}
```
`gridSize`属性决定要创建的独立步骤执行的数量,因此它可以与`TaskExecutor`中线程池的大小匹配。或者,可以将其设置为比可用的线程数量更大,这使得工作块更小。
`TaskExecutorPartitionHandler`对于 IO 密集型`Step`实例很有用,例如复制大量文件或将文件系统复制到内容管理系统中。它还可以通过提供`Step`实现来用于远程执行,该实现是远程调用的代理(例如使用 Spring remoting)。
#### 分割者
`Partitioner`有一个更简单的职责:仅为新的步骤执行生成执行上下文作为输入参数(无需担心重新启动)。它只有一个方法,如下面的接口定义所示:
```
public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
```
这个方法的返回值将每个步骤执行的唯一名称(`String`)与输入参数(`ExecutionContext`)以`ExecutionContext`的形式关联起来。这些名称稍后会在批处理元数据中显示为分区`StepExecutions`中的步骤名称。`ExecutionContext`只是一组名称-值对,因此它可能包含一系列主键、行号或输入文件的位置。然后,远程`Step`通常使用`#{…​}`占位符(在步骤作用域中的后期绑定)绑定到上下文输入,如下一节所示。
步骤执行的名称(由`Partitioner`返回的`Map`中的键)需要在`Job`的步骤执行中是唯一的,但没有任何其他特定的要求。要做到这一点(并使名称对用户有意义),最简单的方法是使用前缀 + 后缀命名约定,其中前缀是正在执行的步骤的名称(它本身在`Job`中是唯一的),后缀只是一个计数器。在使用该约定的框架中有一个`SimplePartitioner`
可以使用一个名为`PartitionNameProvider`的可选接口来提供与分区本身分开的分区名称。如果`Partitioner`实现了这个接口,那么在重新启动时,只会查询名称。如果分区是昂贵的,这可以是一个有用的优化。由`PartitionNameProvider`提供的名称必须与`Partitioner`提供的名称匹配。
#### 将输入数据绑定到步骤
`PartitionHandler`执行的步骤具有相同的配置,并且它们的输入参数在运行时从`ExecutionContext`绑定,这是非常有效的。 Spring 批处理的 StepScope 特性很容易做到这一点(在[后期绑定](step.html#late-binding)一节中更详细地介绍)。例如,如果`Partitioner`使用一个名为`fileName`的属性键创建`ExecutionContext`实例,并针对每个步骤调用指向不同的文件(或目录),则`Partitioner`输出可能类似于下表的内容:
|*步骤执行名称(键)*|*ExecutionContext (value)*|
|---------------------------|--------------------------|
|filecopy:分区 0| fileName=/home/data/one |
|filecopy:partition1| fileName=/home/data/two |
|filecopy:partition2|fileName=/home/data/three |
然后,可以使用与执行上下文的后期绑定将文件名绑定到一个步骤。
下面的示例展示了如何在 XML 中定义后期绑定:
XML 配置
```
<bean id="itemReader" scope="step"
class="org.spr...MultiResourceItemReader">
<property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>
```
下面的示例展示了如何在 Java 中定义后期绑定:
Java 配置
```
@Bean
public MultiResourceItemReader itemReader(
@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
return new MultiResourceItemReaderBuilder<String>()
.delegate(fileReader())
.name("itemReader")
.resources(resources)
.build();
}
```
\ No newline at end of file
# 元数据模式
## 附录 A:元数据模式
### 概述
Spring 批处理元数据表与在 Java 中表示它们的域对象非常匹配。例如,`JobInstance``JobExecution``JobParameters`,和`StepExecution`分别映射到`BATCH_JOB_INSTANCE``BATCH_JOB_EXECUTION``BATCH_JOB_EXECUTION_PARAMS``BATCH_STEP_EXECUTION``ExecutionContext`映射到`BATCH_JOB_EXECUTION_CONTEXT``BATCH_STEP_EXECUTION_CONTEXT``JobRepository`负责将每个 Java 对象保存并存储到其正确的表中。本附录详细描述了元数据表,以及在创建元数据表时做出的许多设计决策。在查看下面的各种表创建语句时,重要的是要认识到所使用的数据类型是尽可能通用的。 Spring Batch 提供了许多模式作为示例,所有这些模式都具有不同的数据类型,这是由于各个数据库供应商处理数据类型的方式有所不同。下图显示了所有 6 个表及其相互关系的 ERD 模型:
![Spring Batch Meta-Data ERD](https://docs.spring.io/spring-batch/docs/current/reference/html/images/meta-data-erd.png)
图 1。 Spring 批处理元数据 ERD
#### 示例 DDL 脚本
Spring 批处理核心 JAR 文件包含用于为许多数据库平台创建关系表的示例脚本(反过来,这些平台由作业存储库工厂 Bean 或等效的名称空间自动检测)。这些脚本可以按原样使用,也可以根据需要修改附加的索引和约束。文件名的形式为`schema-*.sql`,其中“\*”是目标数据库平台的简称。脚本在包`org.springframework.batch.core`中。
#### 迁移 DDL 脚本
Spring Batch 提供了在升级版本时需要执行的迁移 DDL 脚本。这些脚本可以在`org/springframework/batch/core/migration`下的核心 JAR 文件中找到。迁移脚本被组织到与版本号对应的文件夹中,这些版本号被引入:
* `2.2`:如果你从`2.2`之前的版本迁移到`2.2`版本,则包含所需的脚本
* `4.1`:如果你从`4.1`之前的版本迁移到`4.1`版本,则包含所需的脚本
#### 版本
本附录中讨论的许多数据库表都包含一个版本列。这一列很重要,因为 Spring 批处理在处理数据库更新时采用了乐观的锁定策略。这意味着每次“触摸”(更新)记录时,Version 列中的值都会增加一个。当存储库返回以保存该值时,如果版本号发生了更改,它将抛出一个`OptimisticLockingFailureException`,表示在并发访问中出现了错误。这种检查是必要的,因为即使不同的批处理作业可能在不同的机器中运行,它们都使用相同的数据库表。
#### 恒等式
`BATCH_JOB_INSTANCE``BATCH_JOB_EXECUTION``BATCH_STEP_EXECUTION`都包含以`_ID`结尾的列。这些字段充当各自表的主键。然而,它们不是数据库生成的密钥。相反,它们是由单独的序列生成的。这是必要的,因为在将一个域对象插入到数据库中之后,需要在实际对象上设置给定的键,以便在 Java 中对它们进行唯一标识。较新的数据库驱动程序(JDBC3.0 及以上版本)通过数据库生成的键支持此功能。然而,使用的是序列,而不是要求该功能。模式的每个变体都包含以下语句的某种形式:
```
CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_SEQ;
```
许多数据库供应商不支持序列。在这些情况下,使用了变通方法,例如 MySQL 的以下语句:
```
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (ID BIGINT NOT NULL) type=InnoDB;
INSERT INTO BATCH_STEP_EXECUTION_SEQ values(0);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (ID BIGINT NOT NULL) type=InnoDB;
INSERT INTO BATCH_JOB_EXECUTION_SEQ values(0);
CREATE TABLE BATCH_JOB_SEQ (ID BIGINT NOT NULL) type=InnoDB;
INSERT INTO BATCH_JOB_SEQ values(0);
```
在前一种情况下,用一个表来代替每个序列。 Spring 核心类`MySQLMaxValueIncrementer`然后在这个序列中增加一列,以便提供类似的功能。
### `BATCH_JOB_INSTANCE`
`BATCH_JOB_INSTANCE`表保存了与`JobInstance`相关的所有信息,并作为整个层次结构的顶部。下面的通用 DDL 语句用于创建它:
```
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT PRIMARY KEY ,
VERSION BIGINT,
JOB_NAME VARCHAR(100) NOT NULL ,
JOB_KEY VARCHAR(2500)
);
```
下面的列表描述了表中的每个列:
* `JOB_INSTANCE_ID`:标识实例的唯一 ID。这也是主要的关键。可以通过调用`JobInstance`上的`getId`方法获得此列的值。
* `VERSION`:见[Version](#metaDataVersion)
* `JOB_NAME`:从`Job`对象获得的作业的名称。因为需要它来标识实例,所以它不能是空的。
* `JOB_KEY`:`JobParameters`的序列化,该序列化唯一地标识同一作业的不同实例。(具有相同工作名称的`JobInstances`必须有不同的`JobParameters`,因此,不同的`JOB_KEY`值)。
### `BATCH_JOB_EXECUTION_PARAMS`
`BATCH_JOB_EXECUTION_PARAMS`表包含与`JobParameters`对象相关的所有信息。它包含传递给`Job`的 0 个或更多个键/值对,并用作运行作业的参数的记录。对于每个有助于生成作业标识的参数,`IDENTIFYING`标志被设置为 true。请注意,该表已被非规范化。不是为每个类型创建一个单独的表,而是有一个表,其中有一列指示类型,如下面的清单所示:
```
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
TYPE_CD VARCHAR(6) NOT NULL ,
KEY_NAME VARCHAR(100) NOT NULL ,
STRING_VAL VARCHAR(250) ,
DATE_VAL DATETIME DEFAULT NULL ,
LONG_VAL BIGINT ,
DOUBLE_VAL DOUBLE PRECISION ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
);
```
下面的列表描述了每一列:
* `JOB_EXECUTION_ID`:来自`BATCH_JOB_EXECUTION`表的外键,该外键指示参数项所属的作业执行。请注意,每个执行都可能存在多个行(即键/值对)。
* type\_cd:存储的值的类型的字符串表示形式,可以是字符串、日期、长值或双值。因为类型必须是已知的,所以它不能是空的。
* key\_name:参数键。
* string\_val:参数值,如果类型是 string。
* date\_val:参数值,如果类型是 date。
* long\_val:参数值,如果类型是 long。
* double\_val:参数值,如果类型是 double。
* 标识:标志,指示参数是否有助于相关的`JobInstance`的标识。
请注意,此表没有主键。这是因为该框架不需要一个框架,因此不需要它。如果需要,可以添加主键,也可以添加与数据库生成的键,而不会对框架本身造成任何问题。
### `BATCH_JOB_EXECUTION`
`BATCH_JOB_EXECUTION`表包含与`JobExecution`对象相关的所有信息。每次运行`Job`时,总会有一个新的`JobExecution`,并在此表中有一个新的行。下面的清单显示了`BATCH_JOB_EXECUTION`表的定义:
```
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT PRIMARY KEY ,
VERSION BIGINT,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP NOT NULL,
START_TIME TIMESTAMP DEFAULT NULL,
END_TIME TIMESTAMP DEFAULT NULL,
STATUS VARCHAR(10),
EXIT_CODE VARCHAR(20),
EXIT_MESSAGE VARCHAR(2500),
LAST_UPDATED TIMESTAMP,
JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
constraint JOB_INSTANCE_EXECUTION_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;
```
下面的列表描述了每一列:
* `JOB_EXECUTION_ID`:唯一标识此执行的主键。通过调用`JobExecution`对象的`getId`方法,可以获得此列的值。
* `VERSION`:见[Version](#metaDataVersion)
* `JOB_INSTANCE_ID`:来自`BATCH_JOB_INSTANCE`表的外键。它指示此执行所属的实例。每个实例可能有多个执行。
* `CREATE_TIME`:表示创建执行时间的时间戳。
* `START_TIME`:表示开始执行的时间的时间戳。
* `END_TIME`:时间戳表示执行完成的时间,无论成功还是失败。当作业当前未运行时,此列中的空值表示发生了某种类型的错误,框架在失败之前无法执行最后一次保存。
* `STATUS`:表示执行状态的字符串。这可能是`COMPLETED``STARTED`等。此列的对象表示是`BatchStatus`枚举。
* `EXIT_CODE`:表示执行的退出代码的字符串。在命令行作业的情况下,可以将其转换为数字。
* `EXIT_MESSAGE`:表示作业如何退出的更详细描述的字符串。在失败的情况下,这可能包括尽可能多的堆栈跟踪。
* `LAST_UPDATED`:时间戳表示此执行最后一次被持久化的时间。
### `BATCH_STEP_EXECUTION`
批处理 \_step\_execution 表保存与`StepExecution`对象相关的所有信息。该表在许多方面与`BATCH_JOB_EXECUTION`表类似,并且对于每个创建的`JobExecution`,每个`Step`总是至少有一个条目。下面的清单显示了`BATCH_STEP_EXECUTION`表的定义:
```
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
START_TIME TIMESTAMP NOT NULL ,
END_TIME TIMESTAMP DEFAULT NULL,
STATUS VARCHAR(10),
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(20) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP,
constraint JOB_EXECUTION_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
```
下面的列表描述了每个列:
* `STEP_EXECUTION_ID`:唯一标识此执行的主键。可以通过调用`StepExecution`对象的`getId`方法获得此列的值。
* `VERSION`:见[Version](#metaDataVersion)
* `STEP_NAME`:此执行所属的步骤的名称。
* `JOB_EXECUTION_ID`:来自`BATCH_JOB_EXECUTION`表的外键。它指示了`JobExecution`这个`StepExecution`所属的`JobExecution`。对于给定的`JobExecution`名称,可能只有一个`StepExecution`
* `START_TIME`:表示开始执行的时间的时间戳。
* `END_TIME`:时间戳表示执行完成的时间,无论成功还是失败。此列中的空值(即使作业当前未运行)表示存在某种类型的错误,并且框架在失败之前无法执行最后一次保存。
* `STATUS`:表示执行状态的字符串。这可能是`COMPLETED``STARTED`等。此列的对象表示是`BatchStatus`枚举。
* `COMMIT_COUNT`:在执行过程中,步骤提交事务的次数。
* `READ_COUNT`:执行过程中读取的项数。
* `FILTER_COUNT`:从该执行中筛选出的项的数量。
* `WRITE_COUNT`:在执行过程中写入和提交的项的数量。
* `READ_SKIP_COUNT`:在执行过程中读时跳过的项的数量。
* `WRITE_SKIP_COUNT`:在执行过程中在写入时跳过的项数。
* `PROCESS_SKIP_COUNT`:在此执行过程中处理过程中跳过的项的数量。
* `ROLLBACK_COUNT`:执行过程中的回滚次数。请注意,此计数包括每次发生回滚时,包括用于重试的回滚和在跳过恢复过程中的回滚。
* `EXIT_CODE`:表示执行的退出代码的字符串。在命令行作业的情况下,可以将其转换为数字。
* `EXIT_MESSAGE`:表示作业如何退出的更详细描述的字符串。在失败的情况下,这可能包括尽可能多的堆栈跟踪。
* `LAST_UPDATED`:时间戳表示此执行最后一次被持久化的时间。
### `BATCH_JOB_EXECUTION_CONTEXT`
`BATCH_JOB_EXECUTION_CONTEXT`表包含与`Job``ExecutionContext`相关的所有信息。这里正好有一个`Job``ExecutionContext`per`JobExecution`,它包含特定作业执行所需的所有作业级别数据。该数据通常表示故障后必须检索的状态,因此`JobInstance`可以“从它停止的地方开始”。下面的清单显示了`BATCH_JOB_EXECUTION_CONTEXT`表的定义:
```
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT CLOB,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
```
下面的列表描述了每一列:
* `JOB_EXECUTION_ID`:表示上下文所属的`JobExecution`的外键。与给定的执行相关联的行可能不止一个。
* `SHORT_CONTEXT`:`SERIALIZED_CONTEXT`的字符串版本。
* `SERIALIZED_CONTEXT`:整个上下文,序列化。
### `BATCH_STEP_EXECUTION_CONTEXT`
`BATCH_STEP_EXECUTION_CONTEXT`表包含与`Step``ExecutionContext`相关的所有信息。每`StepExecution`正好有一个`ExecutionContext`,它包含了为执行特定步骤而需要持久化的所有数据。该数据通常表示故障后必须检索的状态,这样`JobInstance`就可以“从它停止的地方开始”。下面的清单显示了`BATCH_STEP_EXECUTION_CONTEXT`表的定义:
```
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT CLOB,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;
```
下面的列表描述了每一列:
* `STEP_EXECUTION_ID`:表示上下文所属的`StepExecution`的外键。可能有多个行与给定的执行相关联。
* `SHORT_CONTEXT`:`SERIALIZED_CONTEXT`的字符串版本。
* `SERIALIZED_CONTEXT`:整个上下文,序列化。
### 存档
由于每次运行批处理作业时,多个表中都有条目,因此通常需要为元数据表创建归档策略。这些表本身旨在显示过去发生的事情的记录,并且通常不会影响任何作业的运行,只有一些与重新启动有关的明显例外:
* 该框架使用元数据表来确定特定的`JobInstance`是否已经运行过。如果作业已经运行,并且不能重新启动,那么将抛出一个异常。
* 如果`JobInstance`的条目在未成功完成的情况下被删除,则框架认为该作业是新的,而不是重新启动。
* 如果重新启动了一个作业,框架将使用已持久化到`ExecutionContext`的任何数据来恢复`Job’s`状态。因此,如果作业没有成功完成,从该表中删除任何条目,将阻止它们在再次运行时从正确的点开始。
### 国际和多字节字符
如果你在业务处理中使用多字节字符集(例如中文或西里尔),那么这些字符可能需要在 Spring 批处理模式中持久化。许多用户发现,只需将模式更改为`VARCHAR`列的长度的两倍就足够了。其他人更喜欢将[JobRepository](job.html#configuringJobRepository)配置为`max-varchar-length`列长度的一半。一些用户还报告说,他们在模式定义中使用`NVARCHAR`代替`VARCHAR`。最佳结果取决于数据库平台和本地配置数据库服务器的方式。
### 建立元数据表索引的建议
Spring 批处理为几个常见的数据库平台的核心 JAR 文件中的元数据表提供了 DDL 示例。索引声明不包含在该 DDL 中,因为用户可能希望索引的方式有太多的变化,这取决于他们的精确平台、本地约定以及作业如何操作的业务需求。下面的内容提供了一些指示,说明 Spring Batch 提供的 DAO 实现将在`WHERE`子句中使用哪些列,以及它们可能被使用的频率,以便各个项目可以就索引做出自己的决定:
| Default Table Name | Where Clause |频率|
|----------------------|-----------------------------------------|-------------------------------------------------------------------|
| BATCH\_JOB\_INSTANCE | JOB\_NAME = ? and JOB\_KEY = ? |每次有工作要做的时候|
|BATCH\_JOB\_EXECUTION | JOB\_INSTANCE\_ID = ? |每次重新启动作业时|
|BATCH\_STEP\_EXECUTION| VERSION = ? |在提交间隔上,a.k.a.chunk(以及<br/>步骤的开始和结束)|
|BATCH\_STEP\_EXECUTION|STEP\_NAME = ? and JOB\_EXECUTION\_ID = ?|在每一步执行之前|
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册