# 常见的批处理模式 ## 常见的批处理模式 XMLJavaBoth 一些批处理作业可以完全由 Spring 批处理中现成的组件组装而成。例如,`ItemReader`和`ItemWriter`实现可以被配置为覆盖广泛的场景。然而,在大多数情况下,必须编写自定义代码。应用程序开发人员的主要 API 入口点是`Tasklet`、`ItemReader`、`ItemWriter`和各种侦听器接口。大多数简单的批处理作业可以使用 Spring 批处理中的现成输入`ItemReader`,但是在处理和编写过程中通常存在需要开发人员实现`ItemWriter`或`ItemProcessor`的定制问题。 在这一章中,我们提供了几个自定义业务逻辑中常见模式的示例。这些示例主要以侦听器接口为特征。应该注意的是,如果合适的话,`ItemReader`或`ItemWriter`也可以实现侦听器接口。 ### 记录项目处理和失败 一个常见的用例是需要在一个步骤中对错误进行特殊处理,逐项处理,可能是登录到一个特殊的通道,或者将一条记录插入到数据库中。面向块的`Step`(从 Step Factory Bean 创建)允许用户实现这个用例,它使用一个简单的`ItemReadListener`表示`read`上的错误,使用一个`ItemWriteListener`表示`write`上的错误。以下代码片段演示了记录读写失败的侦听器: ``` public class ItemFailureLoggerListener extends ItemListenerSupport { private static Log logger = LogFactory.getLog("item.error"); public void onReadError(Exception ex) { logger.error("Encountered error on read", e); } public void onWriteError(Exception ex, List items) { logger.error("Encountered error on write", ex); } } ``` 在实现了这个侦听器之后,必须用一个步骤对其进行注册。 下面的示例展示了如何用 XML 中的一个步骤注册侦听器: XML 配置 ``` ... ``` 下面的示例展示了如何使用 STEP Java 注册侦听器: Java 配置 ``` @Bean public Step simpleStep() { return this.stepBuilderFactory.get("simpleStep") ... .listener(new ItemFailureLoggerListener()) .build(); } ``` | |如果你的侦听器在`onError()`方法中执行任何操作,则它必须位于
将被回滚的事务中。如果需要在`onError()`方法中使用事务性
资源,例如数据库,请考虑向该方法添加声明性
事务(有关详细信息,请参见 Spring Core Reference Guide),并给其
传播属性一个值`REQUIRES_NEW`。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ### 由于业务原因手动停止作业 Spring Batch 通过`JobOperator`接口提供了`stop()`方法,但这实际上是供操作员而不是应用程序程序员使用的。有时,从业务逻辑中停止作业执行更方便或更有意义。 最简单的方法是抛出`RuntimeException`(这种方法既不会无限期地重试,也不会被跳过)。例如,可以使用自定义异常类型,如下例所示: ``` public class PoisonPillItemProcessor implements ItemProcessor { @Override public T process(T item) throws Exception { if (isPoisonPill(item)) { throw new PoisonPillException("Poison pill detected: " + item); } return item; } } ``` 另一种停止执行步骤的简单方法是从`ItemReader`返回`null`,如以下示例所示: ``` public class EarlyCompletionItemReader implements ItemReader { private ItemReader delegate; public void setDelegate(ItemReader delegate) { ... } public T read() throws Exception { T item = delegate.read(); if (isEndItem(item)) { return null; // end the step here } return item; } } ``` 前面的示例实际上依赖于这样一个事实,即存在`CompletionPolicy`策略的默认实现,当要处理的项是`null`时,该策略发出一个完整批处理的信号。可以实现一个更复杂的完成策略,并通过`SimpleStepFactoryBean`注入`Step`。 下面的示例展示了如何在 XML 中的一个步骤中注入一个完成策略: XML 配置 ``` ``` 下面的示例展示了如何在 Java 中的一个步骤中注入一个完成策略: Java 配置 ``` @Bean public Step simpleStep() { return this.stepBuilderFactory.get("simpleStep") .chunk(new SpecialCompletionPolicy()) .reader(reader()) .writer(writer()) .build(); } ``` 一种替代方法是在`StepExecution`中设置一个标志,这是由`Step`实现在框架中检查项之间的处理。要实现此替代方案,我们需要访问当前的`StepExecution`,这可以通过实现`StepListener`并将其注册到`Step`来实现。下面的示例展示了一个设置标志的侦听器: ``` public class CustomItemWriter extends ItemListenerSupport implements StepListener { private StepExecution stepExecution; public void beforeStep(StepExecution stepExecution) { this.stepExecution = stepExecution; } public void afterRead(Object item) { if (isPoisonPill(item)) { stepExecution.setTerminateOnly(); } } } ``` 设置标志时,默认的行为是抛出`JobInterruptedException`。这种行为可以通过`StepInterruptionPolicy`来控制。然而,唯一的选择是抛出或不抛出异常,因此这始终是工作的异常结束。 ### 添加页脚记录 通常,当写入平面文件时,在所有处理完成后,必须在文件的末尾附加一个“页脚”记录。这可以使用由 Spring 批提供的`FlatFileFooterCallback`接口来实现。`FlatFileFooterCallback`(及其对应的`FlatFileHeaderCallback`)是`FlatFileItemWriter`的可选属性,可以添加到项编写器中。 下面的示例展示了如何在 XML 中使用`FlatFileHeaderCallback`和`FlatFileFooterCallback`: XML 配置 ``` ``` 下面的示例展示了如何在 Java 中使用`FlatFileHeaderCallback`和`FlatFileFooterCallback`: Java 配置 ``` @Bean public FlatFileItemWriter itemWriter(Resource outputResource) { return new FlatFileItemWriterBuilder() .name("itemWriter") .resource(outputResource) .lineAggregator(lineAggregator()) .headerCallback(headerCallback()) .footerCallback(footerCallback()) .build(); } ``` 页脚回调接口只有一个方法,在必须写入页脚时调用该方法,如以下接口定义所示: ``` public interface FlatFileFooterCallback { void writeFooter(Writer writer) throws IOException; } ``` #### 编写摘要页脚 涉及页脚记录的一个常见要求是在输出过程中聚合信息,并将这些信息附加到文件的末尾。这个页脚通常用作文件的摘要或提供校验和。 例如,如果一个批处理作业正在将`Trade`记录写入一个平面文件,并且要求将所有`Trades`的总量放入一个页脚中,那么可以使用以下`ItemWriter`实现: ``` public class TradeItemWriter implements ItemWriter, FlatFileFooterCallback { private ItemWriter delegate; private BigDecimal totalAmount = BigDecimal.ZERO; public void write(List items) throws Exception { BigDecimal chunkTotal = BigDecimal.ZERO; for (Trade trade : items) { chunkTotal = chunkTotal.add(trade.getAmount()); } delegate.write(items); // After successfully writing all items totalAmount = totalAmount.add(chunkTotal); } public void writeFooter(Writer writer) throws IOException { writer.write("Total Amount Processed: " + totalAmount); } public void setDelegate(ItemWriter delegate) {...} } ``` 这个`TradeItemWriter`存储了一个`totalAmount`值,该值随着从每个`Trade`条目中写入的`amount`而增加。在处理最后一个`Trade`之后,框架调用`writeFooter`,这将`totalAmount`放入文件。请注意,`write`方法使用了一个临时变量`chunkTotal`,该变量存储了块中`Trade`数量的总和。这样做是为了确保,如果在`write`方法中发生跳过,`totalAmount`保持不变。只有在`write`方法结束时,在保证不抛出异常之后,我们才更新`totalAmount`。 为了调用`writeFooter`方法,`TradeItemWriter`(它实现`FlatFileFooterCallback`)必须连接到`FlatFileItemWriter`中,作为`footerCallback`。 下面的示例展示了如何在 XML 中连接`TradeItemWriter`: XML 配置 ``` ``` 下面的示例展示了如何在 Java 中连接`TradeItemWriter`: Java 配置 ``` @Bean public TradeItemWriter tradeItemWriter() { TradeItemWriter itemWriter = new TradeItemWriter(); itemWriter.setDelegate(flatFileItemWriter(null)); return itemWriter; } @Bean public FlatFileItemWriter flatFileItemWriter(Resource outputResource) { return new FlatFileItemWriterBuilder() .name("itemWriter") .resource(outputResource) .lineAggregator(lineAggregator()) .footerCallback(tradeItemWriter()) .build(); } ``` 到目前为止,只有当`Step`不可重启时,`TradeItemWriter`的写入方式才能正确地执行。这是因为类是有状态的(因为它存储`totalAmount`),但是`totalAmount`不会持久化到数据库中。因此,在重新启动的情况下无法检索到它。为了使这个类重新启动,`ItemStream`接口应该与`open`和`update`方法一起实现,如下面的示例所示: ``` public void open(ExecutionContext executionContext) { if (executionContext.containsKey("total.amount") { totalAmount = (BigDecimal) executionContext.get("total.amount"); } } public void update(ExecutionContext executionContext) { executionContext.put("total.amount", totalAmount); } ``` 更新方法将最新版本的`totalAmount`存储到`ExecutionContext`,就在该对象持久化到数据库之前。open 方法从`ExecutionContext`中检索任何已存在的`totalAmount`,并将其用作处理的起点,从而允许`TradeItemWriter`在重新启动时在上次运行`Step`时未启动的地方进行拾取。 ### 基于项目阅读器的驾驶查询 在[关于读者和作家的章节](readersAndWriters.html)中,讨论了利用分页进行数据库输入的问题。许多数据库供应商(例如 DB2)都有非常悲观的锁定策略,如果正在读取的表也需要由在线应用程序的其他部分使用,这些策略可能会导致问题。此外,在非常大的数据集上打开游标可能会导致某些供应商的数据库出现问题。因此,许多项目更喜欢使用“驱动查询”方法来读取数据。这种方法的工作原理是对键进行迭代,而不是对需要返回的整个对象进行迭代,如下图所示: ![驾驶查询工作](./images/drivingQueryExample.png) 图 1。驾驶查询工作 正如你所看到的,前面图片中显示的示例使用了与基于游标的示例中使用的相同的“foo”表。但是,在 SQL 语句中只选择了 ID,而不是选择整行。因此,不是从`read`返回`FOO`对象,而是返回`Integer`对象。然后可以使用这个数字来查询“details”,这是一个完整的`Foo`对象,如下图所示: ![驱动查询示例](./images/drivingQueryJob.png) 图 2。驱动查询示例 应该使用`ItemProcessor`将从驱动查询中获得的键转换为完整的`Foo`对象。现有的 DAO 可以用于基于该键查询完整的对象。 ### 多行记录 虽然平面文件的情况通常是,每个记录都被限制在单行中,但一个文件的记录可能跨越多行,并具有多种格式,这是很常见的。下面摘自一个文件,展示了这种安排的一个例子: ``` HEA;0013100345;2007-02-15 NCU;Smith;Peter;;T;20014539;F BAD;;Oak Street 31/A;;Small Town;00235;IL;US FOT;2;2;267.34 ``` 以“hea”开头的行和以“fot”开头的行之间的所有内容都被视为一条记录。为了正确处理这种情况,必须考虑以下几点: * 而不是一次读取一条记录,`ItemReader`必须将多行记录的每一行作为一个组来读取,以便它可以完整地传递给`ItemWriter`。 * 每一种行类型可能需要以不同的方式进行标记。 由于单个记录跨越多行,并且我们可能不知道有多少行,因此`ItemReader`必须小心,以始终读取整个记录。为了做到这一点,应该将自定义`ItemReader`实现为`FlatFileItemReader`的包装器。 下面的示例展示了如何在 XML 中实现自定义`ItemReader`: XML 配置 ``` ``` 下面的示例展示了如何在 Java 中实现自定义`ItemReader`: Java 配置 ``` @Bean public MultiLineTradeItemReader itemReader() { MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader(); itemReader.setDelegate(flatFileItemReader()); return itemReader; } @Bean public FlatFileItemReader flatFileItemReader() { FlatFileItemReader reader = new FlatFileItemReaderBuilder<>() .name("flatFileItemReader") .resource(new ClassPathResource("data/iosample/input/multiLine.txt")) .lineTokenizer(orderFileTokenizer()) .fieldSetMapper(orderFieldSetMapper()) .build(); return reader; } ``` 为了确保每一行都被正确地标记,这对于固定长度的输入尤其重要,`PatternMatchingCompositeLineTokenizer`可以在委托`FlatFileItemReader`上使用。有关更多详细信息,请参见[`FlatFileItemReader`中的 Readers and Writers 章节]。然后,委托读取器使用`PassThroughFieldSetMapper`将每一行的`FieldSet`传递到包装`ItemReader`。 下面的示例展示了如何确保每一行都正确地在 XML 中进行了标记: XML 内容 ``` ``` 下面的示例展示了如何确保每一行都在 Java 中被正确地标记: Java 内容 ``` @Bean public PatternMatchingCompositeLineTokenizer orderFileTokenizer() { PatternMatchingCompositeLineTokenizer tokenizer = new PatternMatchingCompositeLineTokenizer(); Map tokenizers = new HashMap<>(4); tokenizers.put("HEA*", headerRecordTokenizer()); tokenizers.put("FOT*", footerRecordTokenizer()); tokenizers.put("NCU*", customerLineTokenizer()); tokenizers.put("BAD*", billingAddressLineTokenizer()); tokenizer.setTokenizers(tokenizers); return tokenizer; } ``` 这个包装器必须能够识别记录的结尾,以便它可以在其委托上连续调用`read()`,直到达到结尾。对于读取的每一行,包装器应该构建要返回的项。一旦到达页脚,就可以将项目返回以交付给`ItemProcessor`和`ItemWriter`,如以下示例所示: ``` private FlatFileItemReader
delegate; public Trade read() throws Exception { Trade t = null; for (FieldSet line = null; (line = this.delegate.read()) != null;) { String prefix = line.readString(0); if (prefix.equals("HEA")) { t = new Trade(); // Record must start with header } else if (prefix.equals("NCU")) { Assert.notNull(t, "No header was found."); t.setLast(line.readString(1)); t.setFirst(line.readString(2)); ... } else if (prefix.equals("BAD")) { Assert.notNull(t, "No header was found."); t.setCity(line.readString(4)); t.setState(line.readString(6)); ... } else if (prefix.equals("FOT")) { return t; // Record must end with footer } } Assert.isNull(t, "No 'END' was found."); return null; } ``` ### 执行系统命令 许多批处理作业要求从批处理作业中调用外部命令。这样的进程可以由调度器单独启动,但是有关运行的公共元数据的优势将会丧失。此外,一个多步骤的工作也需要被分解成多个工作。 因为这种需求是如此普遍, Spring Batch 提供了用于调用系统命令的`Tasklet`实现。 下面的示例展示了如何调用 XML 中的外部命令: XML 配置 ``` ``` 下面的示例展示了如何在 Java 中调用外部命令: Java 配置 ``` @Bean public SystemCommandTasklet tasklet() { SystemCommandTasklet tasklet = new SystemCommandTasklet(); tasklet.setCommand("echo hello"); tasklet.setTimeout(5000); return tasklet; } ``` ### 未找到输入时的处理步骤完成 在许多批处理场景中,在数据库或文件中找不到要处理的行并不是例外情况。将`Step`简单地视为未找到工作,并在读取 0 项的情况下完成。所有的`ItemReader`实现都是在 Spring 批处理中提供的,默认为这种方法。如果即使存在输入,也没有写出任何内容,这可能会导致一些混乱(如果文件被错误命名或出现类似问题,通常会发生这种情况)。因此,应该检查元数据本身,以确定框架需要处理多少工作。然而,如果发现没有输入被认为是例外情况怎么办?在这种情况下,最好的解决方案是通过编程方式检查元数据,以确保未处理任何项目并导致失败。因为这是一个常见的用例, Spring Batch 提供了一个具有这种功能的侦听器,如`NoWorkFoundStepExecutionListener`的类定义所示: ``` public class NoWorkFoundStepExecutionListener extends StepExecutionListenerSupport { public ExitStatus afterStep(StepExecution stepExecution) { if (stepExecution.getReadCount() == 0) { return ExitStatus.FAILED; } return null; } } ``` 前面的`StepExecutionListener`在“afterstep”阶段检查`StepExecution`的`readCount`属性,以确定是否没有读取任何项。如果是这种情况,将返回一个退出代码`FAILED`,表示`Step`应该失败。否则,将返回`null`,这不会影响`Step`的状态。 ### 将数据传递给未来的步骤 将信息从一个步骤传递到另一个步骤通常是有用的。这可以通过`ExecutionContext`来完成。问题是有两个`ExecutionContexts`:一个在`Step`水平,一个在`Job`水平。`Step``ExecutionContext`只保留到步骤的长度,而`Job``ExecutionContext`则保留到整个`Job`。另一方面,`Step``ExecutionContext`每次`Step`提交一个块时都会更新`Job``ExecutionContext`,而`Step`只在每个`Step`的末尾更新。 这种分离的结果是,当`Step`执行时,所有数据都必须放在`Step``ExecutionContext`中。这样做可以确保在`Step`运行时正确地存储数据。如果数据被存储到`Job``ExecutionContext`,那么在`Step`执行期间它不会被持久化。如果`Step`失败,则该数据丢失。 ``` public class SavingItemWriter implements ItemWriter { private StepExecution stepExecution; public void write(List items) throws Exception { // ... ExecutionContext stepContext = this.stepExecution.getExecutionContext(); stepContext.put("someKey", someObject); } @BeforeStep public void saveStepExecution(StepExecution stepExecution) { this.stepExecution = stepExecution; } } ``` 要使将来`Steps`可以使用该数据,必须在步骤完成后将其“提升”到`Job``ExecutionContext`。 Spring Batch 为此提供了`ExecutionContextPromotionListener`。侦听器必须配置与必须提升的`ExecutionContext`中的数据相关的键。它还可以配置一个退出代码模式列表(`COMPLETED`是默认的)。与所有侦听器一样,它必须在`Step`上注册。 下面的示例展示了如何在 XML 中将一个步骤提升到`Job``ExecutionContext`: XML 配置 ``` ... someKey ``` 下面的示例展示了如何在 Java 中将一个步骤提升到`Job``ExecutionContext`: Java 配置 ``` @Bean public Job job1() { return this.jobBuilderFactory.get("job1") .start(step1()) .next(step1()) .build(); } @Bean public Step step1() { return this.stepBuilderFactory.get("step1") .chunk(10) .reader(reader()) .writer(savingWriter()) .listener(promotionListener()) .build(); } @Bean public ExecutionContextPromotionListener promotionListener() { ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener(); listener.setKeys(new String[] {"someKey"}); return listener; } ``` 最后,必须从`Job``ExecutionContext`中检索保存的值,如下例所示: ``` public class RetrievingItemWriter implements ItemWriter { private Object someObject; public void write(List items) throws Exception { // ... } @BeforeStep public void retrieveInterstepData(StepExecution stepExecution) { JobExecution jobExecution = stepExecution.getJobExecution(); ExecutionContext jobContext = jobExecution.getExecutionContext(); this.someObject = jobContext.get("someKey"); } } ```