spring-batch-integration.md 41.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091
# Spring 批处理集成

##  Spring 批处理集成

XMLJavaBoth

###  Spring 批处理集成介绍

Spring 批处理的许多用户可能会遇到不在 Spring 批处理范围内的需求,但这些需求可以通过使用 Spring 集成来高效而简洁地实现。相反, Spring 集成用户可能会遇到 Spring 批处理需求,并且需要一种有效地集成这两个框架的方法。在这种情况下,出现了几种模式和用例, Spring 批处理集成解决了这些需求。

Spring 批处理和 Spring 集成之间的界限并不总是清晰的,但有两条建议可以提供帮助:考虑粒度,并应用公共模式。这些常见模式中的一些在本参考手册一节中进行了描述。

将消息传递添加到批处理过程中,可以实现操作的自动化,还可以分离关键关注事项并制定策略。例如,消息可能会触发作业执行,然后消息的发送可以通过多种方式公开。或者,当作业完成或失败时,该事件可能会触发要发送的消息,而这些消息的消费者可能有与应用程序本身无关的操作问题。消息传递也可以嵌入到作业中(例如,通过通道读取或写入用于处理的项)。远程分区和远程分块提供了在多个工作人员上分配工作负载的方法。

本节涵盖以下关键概念:

* [命名空间支持](#namespace-support)

* [通过消息启动批处理作业](#launching-batch-jobs-through-messages)

* [提供反馈信息](#providing-feedback-with-informational-messages)

* [异步处理器](#asynchronous-processors)

* [外部化批处理过程执行](#externalizing-batch-process-execution)

#### 名称空间支持

Spring 自批处理集成 1.3 以来,添加了专用的 XML 命名空间支持,目的是提供更简单的配置体验。为了激活命名空间,请将以下命名空间声明添加到 Spring XML 应用程序上下文文件中:

```
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
  xsi:schemaLocation="
    http://www.springframework.org/schema/batch-integration
    https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd">

    ...

</beans>
```

用于 Spring 批处理集成的完全配置的 Spring XML 应用程序上下文文件可能如下所示:

```
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:batch="http://www.springframework.org/schema/batch"
  xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
  xsi:schemaLocation="
    http://www.springframework.org/schema/batch-integration
    https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd
    http://www.springframework.org/schema/batch
    https://www.springframework.org/schema/batch/spring-batch.xsd
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd">

    ...

</beans>
```

也允许将版本号附加到引用的 XSD 文件中,但是,由于无版本声明总是使用最新的模式,因此我们通常不建议将版本号附加到 XSD 名称中。添加版本号可能会在更新 Spring 批处理集成依赖项时产生问题,因为它们可能需要 XML 模式的最新版本。

#### 通过消息启动批处理作业

当通过使用核心 Spring 批处理 API 启动批处理作业时,你基本上有两个选项:

* 在命令行中,使用`CommandLineJobRunner`

* 在编程上,使用`JobOperator.start()``JobLauncher.run()`

例如,当通过使用 shell 脚本调用批处理作业时,你可能希望使用`CommandLineJobRunner`。或者,你可以直接使用`JobOperator`(例如,当使用 Spring 批处理作为 Web 应用程序的一部分时)。然而,更复杂的用例呢?也许你需要轮询远程 FTP 服务器来检索批处理作业的数据,或者你的应用程序必须同时支持多个不同的数据源。例如,你不仅可以从 Web 接收数据文件,还可以从 FTP 和其他来源接收数据文件。在调用 Spring 批处理之前,可能需要对输入文件进行额外的转换。

因此,使用 Spring 集成及其众多适配器来执行批处理作业将会强大得多。例如,你可以使用*文件入站通道适配器*来监视文件系统中的一个目录,并在输入文件到达时立即启动批处理作业。此外,你还可以创建 Spring 集成流,这些集成流使用多个不同的适配器,仅使用配置就可以轻松地从多个源同时为批处理作业摄取数据。使用 Spring 集成实现所有这些场景是很容易的,因为它允许对`JobLauncher`进行解耦、事件驱动的执行。

Spring 批处理集成提供了`JobLaunchingMessageHandler`类,你可以使用它来启动批处理作业。`JobLaunchingMessageHandler`的输入由 Spring 集成消息提供,该消息的有效负载类型为`JobLaunchRequest`。这个类是围绕需要启动的`Job`和启动批处理作业所必需的`JobParameters`的包装器。

下面的图像演示了典型的 Spring 集成消息流,以便启动批处理作业。[Enterprise 集成模式网站](https://www.enterpriseintegrationpatterns.com/toc.html)提供了消息传递图标及其描述的完整概述。

![启动批处理作业](https://docs.spring.io/spring-batch/docs/current/reference/html/images/launch-batch-job.png)

图 1。启动批处理作业

##### 将文件转换为 joblaunchrequest

```
package io.spring.sbi;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}
```

##### the`JobExecution`响应

当执行批处理作业时,将返回一个`JobExecution`实例。此实例可用于确定执行的状态。如果`JobExecution`能够成功创建,则无论实际执行是否成功,它总是被返回。

如何返回`JobExecution`实例的确切行为取决于所提供的`TaskExecutor`。如果使用`synchronous`(单线程)`TaskExecutor`实现,则只返回`JobExecution`响应`after`作业完成。当使用`asynchronous``TaskExecutor`时,将立即返回`JobExecution`实例。然后,用户可以使用`JobExecution``id`实例(带有`JobExecution.getJobId()`),并使用`JobExplorer`查询`JobRepository`中的作业更新状态。有关更多信息,请参阅关于[查询存储库](job.html#queryingRepository)的 Spring 批参考文档。

#####  Spring 批处理集成配置

考虑这样一种情况:需要创建一个文件`inbound-channel-adapter`来监听所提供的目录中的 CSV 文件,将它们交给转换器(`FileMessageToJobRequest`),通过*工作启动网关*启动作业,然后用`logging-channel-adapter`记录`JobExecution`的输出。

下面的示例展示了如何在 XML 中配置这种常见的情况:

XML 配置

```
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<int-file:inbound-channel-adapter id="filePoller"
    channel="inboundFileChannel"
    directory="file:/tmp/myfiles/"
    filename-pattern="*.csv">
  <int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
  <bean class="io.spring.sbi.FileMessageToJobRequest">
    <property name="job" ref="personJob"/>
    <property name="fileParameterName" value="input.file.name"/>
  </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
```

下面的示例展示了如何在 Java 中配置这种常见情况:

Java 配置

```
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            transform(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}
```

##### 示例 itemreader 配置

现在我们正在轮询文件和启动作业,我们需要配置我们的 Spring 批处理`ItemReader`(例如),以使用在名为“input.file.name”的作业参数所定义的位置找到的文件,如下面的 Bean 配置所示:

下面的 XML 示例展示了必要的配置 Bean:

XML 配置

```
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
    scope="step">
  <property name="resource" value="file://#{jobParameters['input.file.name']}"/>
    ...
</bean>
```

下面的 Java 示例展示了必要的配置 Bean:

Java 配置

```
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
    FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
    flatFileItemReader.setResource(new FileSystemResource(resource));
...
    return flatFileItemReader;
}
```

在前面的示例中,主要的关注点是注入`#{jobParameters['input.file.name']}`的值作为资源属性值,并将`ItemReader` Bean 设置为具有*步骤范围*。将 Bean 设置为具有步骤作用域利用了后期绑定支持,这允许访问`jobParameters`变量。

### 作业启动网关的可用属性

作业启动网关具有以下属性,你可以设置这些属性来控制作业:

* `id`:标识底层的 Spring  Bean 定义,它是以下两种定义之一的实例:

  * `EventDrivenConsumer`

  * `PollingConsumer`(准确的实现取决于组件的输入通道是`SubscribableChannel`还是`PollableChannel`。)

* `auto-startup`:布尔标志,指示端点在启动时应自动启动。默认值为*true*

* `request-channel`:此端点的输入`MessageChannel`

* `reply-channel`:`MessageChannel`将结果`JobExecution`的有效载荷发送到该负载。

* `reply-timeout`:允许你指定此网关在抛出异常之前等待多长时间(以毫秒为单位)以将答复消息成功发送到答复通道。此属性仅在通道可能阻塞时才应用(例如,当使用当前已满的有界队列通道时)。另外,请记住,当发送到`DirectChannel`时,调用发生在发送方的线程中。因此,发送操作的失败可能是由更下游的其他组件引起的。`reply-timeout`属性映射到底层`sendTimeout`实例的`sendTimeout`属性。如果没有指定,则属性默认为 \<emphasis\>-1\</emphasis\>,这意味着,默认情况下,`Gateway`无限期地等待。

* `job-launcher`:可选。接受自定义`JobLauncher` Bean 引用。如果没有指定适配器,则重新使用在`jobLauncher``id`下注册的实例。如果不存在缺省实例,则抛出一个异常。

* `order`:指定当此端点作为订阅服务器连接到`SubscribableChannel`时的调用顺序。

### 子元素

`Gateway`接收来自`PollableChannel`的消息时,你必须为`Poller`提供一个全局默认值`Poller`,或者为`Job Launching Gateway`提供一个子元素。

下面的示例展示了如何用 XML 提供一个 Poller:

XML 配置

```
<batch-int:job-launching-gateway request-channel="queueChannel"
    reply-channel="replyChannel" job-launcher="jobLauncher">
  <int:poller fixed-rate="1000">
</batch-int:job-launching-gateway>
```

下面的示例展示了如何用 Java 提供一个 Poller:

Java 配置

```
@Bean
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway sampleJobLaunchingGateway() {
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
    jobLaunchingGateway.setOutputChannel(replyChannel());
    return jobLaunchingGateway;
}
```

#### 提供反馈信息

Spring 由于批处理作业可以运行很长时间,因此提供进度信息通常是至关重要的。例如,如果批处理作业的某些部分或所有部分都失败了,利益相关者可能希望得到通知。 Spring 批处理为正在通过以下方式收集的此信息提供支持:

* 活动轮询

* 事件驱动侦听器

在异步启动 Spring 批处理作业时(例如,通过使用`Job Launching Gateway`),将返回一个`JobExecution`实例。因此,通过使用`JobExplorer``JobRepository`检索`JobExecution`的更新实例,`JobExecution.getJobId()`可用于连续轮询状态更新。然而,这被认为是次优的,事件驱动的方法应该是首选的。

因此, Spring 批提供了侦听器,包括三个最常用的侦听器:

* `StepListener`

* `ChunkListener`

* `JobExecutionListener`

在下图所示的示例中, Spring 批处理作业已配置为`StepExecutionListener`。因此, Spring 集成接收并处理事件之前或之后的任何步骤。例如,接收到的`StepExecution`可以通过使用`Router`进行检查。基于该检查的结果,可以发生各种事情(例如将消息路由到邮件出站通道适配器),以便可以基于某些条件发送出电子邮件通知。

![处理信息消息](https://docs.spring.io/spring-batch/docs/current/reference/html/images/handling-informational-messages.png)

图 2。处理信息消息

下面由两部分组成的示例展示了侦听器如何配置为向`Gateway`事件发送消息到`StepExecution`,并将其输出记录到`logging-channel-adapter`

首先,创建通知集成 bean。

下面的示例展示了如何在 XML 中创建通知集成 bean:

XML 配置

```
<int:channel id="stepExecutionsChannel"/>

<int:gateway id="notificationExecutionsListener"
    service-interface="org.springframework.batch.core.StepExecutionListener"
    default-request-channel="stepExecutionsChannel"/>

<int:logging-channel-adapter channel="stepExecutionsChannel"/>
```

下面的示例展示了如何在 Java 中创建通知集成 bean:

Java 配置

```
@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
    LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
    adapter.setLoggerName("TEST_LOGGER");
    adapter.setLogExpressionString("headers.id + ': ' + payload");
    return adapter;
}

@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}
```

|   |你需要将`@IntegrationComponentScan`注释添加到配置中。|
|---|---------------------------------------------------------------------------------|

其次,修改工作以添加一个步骤级侦听器。

下面的示例展示了如何在 XML 中添加一个步骤级侦听器:

XML 配置

```
<job id="importPayments">
    <step id="step1">
        <tasklet ../>
            <chunk ../>
            <listeners>
                <listener ref="notificationExecutionsListener"/>
            </listeners>
        </tasklet>
        ...
    </step>
</job>
```

下面的示例展示了如何在 Java 中添加一个步骤级侦听器:

Java 配置

```
public Job importPaymentsJob() {
    return jobBuilderFactory.get("importPayments")
        .start(stepBuilderFactory.get("step1")
                .chunk(200)
                .listener(notificationExecutionsListener())
                ...
}
```

#### 异步处理器

异步处理器帮助你扩展项目的处理。在异步处理器用例中,`AsyncItemProcessor`充当调度器,为新线程上的项执行`ItemProcessor`的逻辑。项目完成后,将`Future`传递给要写入的`AsynchItemWriter`

因此,你可以通过使用异步项目处理来提高性能,基本上允许你实现*fork-join *场景。`AsyncItemWriter`收集结果,并在所有结果可用时立即写回块。

下面的示例展示了如何在 XML 中配置`AsyncItemProcessor`:

XML 配置

```
<bean id="processor"
    class="org.springframework.batch.integration.async.AsyncItemProcessor">
  <property name="delegate">
    <bean class="your.ItemProcessor"/>
  </property>
  <property name="taskExecutor">
    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
  </property>
</bean>
```

下面的示例展示了如何在 XML 中配置`AsyncItemProcessor`:

Java 配置

```
@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
    AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
    asyncItemProcessor.setTaskExecutor(taskExecutor);
    asyncItemProcessor.setDelegate(itemProcessor);
    return asyncItemProcessor;
}
```

`delegate`属性是指你的`ItemProcessor` Bean,而`taskExecutor`属性是指你选择的`TaskExecutor`

下面的示例展示了如何在 XML 中配置`AsyncItemWriter`:

XML 配置

```
<bean id="itemWriter"
    class="org.springframework.batch.integration.async.AsyncItemWriter">
  <property name="delegate">
    <bean id="itemWriter" class="your.ItemWriter"/>
  </property>
</bean>
```

下面的示例展示了如何在 Java 中配置`AsyncItemWriter`:

Java 配置

```
@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
    AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
    asyncItemWriter.setDelegate(itemWriter);
    return asyncItemWriter;
}
```

同样,`delegate`属性实际上是对你的`ItemWriter` Bean 的引用。

#### 外部化批处理过程执行

到目前为止讨论的集成方法建议使用 Spring 集成像外壳一样包装 Spring 批处理的用例。然而, Spring 批处理也可以在内部使用 Spring 集成。 Spring 使用这种方法,批处理用户可以将项目甚至块的处理委托给外部进程。这允许你卸载复杂的处理。 Spring 批处理集成为以下方面提供了专门的支持:

* 远程分块

* 远程分区

##### 远程分块

![远程分块](https://docs.spring.io/spring-batch/docs/current/reference/html/images/remote-chunking-sbi.png)

图 3。远程分块

更进一步,还可以使用`ChunkMessageChannelItemWriter`(由 Spring Batch Integration 提供)将块处理外部化,它将项发送出去并收集结果。一旦发送, Spring 批处理继续读取和分组项的过程,而无需等待结果。相反,收集结果并将其集成回 Spring 批处理过程是`ChunkMessageChannelItemWriter`的责任。

通过 Spring 集成,你可以完全控制进程的并发性(例如,通过使用`QueueChannel`而不是`DirectChannel`)。此外,通过依赖 Spring Integration 的通道适配器(例如 JMS 和 AMQP)的丰富集合,你可以将批处理作业的块分配给外部系统进行处理。

带有要远程分块的步骤的作业可能具有类似于 XML 中的以下配置:

XML 配置

```
<job id="personJob">
  <step id="step1">
    <tasklet>
      <chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
    </tasklet>
    ...
  </step>
</job>
```

带有要远程分块的步骤的作业可能具有类似于 Java 中的以下配置:

Java 配置

```
public Job chunkJob() {
     return jobBuilderFactory.get("personJob")
             .start(stepBuilderFactory.get("step1")
                     .<Person, Person>chunk(200)
                     .reader(itemReader())
                     .writer(itemWriter())
                     .build())
             .build();
 }
```

`ItemReader`引用指向要用于读取 Manager 上的数据的 Bean。正如上面所描述的,`ItemWriter`引用指向一个特殊的`ItemWriter`(称为`ChunkMessageChannelItemWriter`)。处理器(如果有的话)不在 Manager 配置中,因为它是在 Worker 上配置的。在实现用例时,你应该检查任何附加的组件属性,例如油门限制等。

以下 XML 配置提供了基本的 Manager 设置:

XML 配置

```
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>

<bean id="messagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate">
  <property name="defaultChannel" ref="requests"/>
  <property name="receiveTimeout" value="2000"/>
</bean>

<bean id="itemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step">
  <property name="messagingOperations" ref="messagingTemplate"/>
  <property name="replyChannel" ref="replies"/>
</bean>

<int:channel id="replies">
  <int:queue/>
</int:channel>

<int-jms:message-driven-channel-adapter id="jmsReplies"
    destination-name="replies"
    channel="replies"/>
```

下面的 Java 配置提供了一个基本的 Manager 设置:

Java 配置

```
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure outbound flow (requests going to workers)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(requests())
            .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
            .get();
}

/*
 * Configure inbound flow (replies coming from workers)
 */
@Bean
public QueueChannel replies() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
            .channel(replies())
            .get();
}

/*
 * Configure the ChunkMessageChannelItemWriter
 */
@Bean
public ItemWriter<Integer> itemWriter() {
    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setDefaultChannel(requests());
    messagingTemplate.setReceiveTimeout(2000);
    ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
            = new ChunkMessageChannelItemWriter<>();
    chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
    chunkMessageChannelItemWriter.setReplyChannel(replies());
    return chunkMessageChannelItemWriter;
}
```

前面的配置为我们提供了许多 bean。我们使用 ActiveMQ 和 Spring Integration 提供的入站/出站 JMS 适配器配置消息传递中间件。如图所示,我们的作业步骤引用的`itemWriter` Bean 使用`ChunkMessageChannelItemWriter`在配置的中间件上写块。

现在我们可以转到 Worker 配置,如下面的示例所示:

下面的示例显示了 XML 中的工作配置:

XML 配置

```
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int:channel id="requests"/>
<int:channel id="replies"/>

<int-jms:message-driven-channel-adapter id="incomingRequests"
    destination-name="requests"
    channel="requests"/>

<int-jms:outbound-channel-adapter id="outgoingReplies"
    destination-name="replies"
    channel="replies">
</int-jms:outbound-channel-adapter>

<int:service-activator id="serviceActivator"
    input-channel="requests"
    output-channel="replies"
    ref="chunkProcessorChunkHandler"
    method="handleChunk"/>

<bean id="chunkProcessorChunkHandler"
    class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
  <property name="chunkProcessor">
    <bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
      <property name="itemWriter">
        <bean class="io.spring.sbi.PersonItemWriter"/>
      </property>
      <property name="itemProcessor">
        <bean class="io.spring.sbi.PersonItemProcessor"/>
      </property>
    </bean>
  </property>
</bean>
```

下面的示例展示了 Java 中的 worker 配置:

Java 配置

```
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure inbound flow (requests coming from the manager)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
            .channel(requests())
            .get();
}

/*
 * Configure outbound flow (replies going to the manager)
 */
@Bean
public DirectChannel replies() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(replies())
            .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
            .get();
}

/*
 * Configure the ChunkProcessorChunkHandler
 */
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
    ChunkProcessor<Integer> chunkProcessor
            = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
    ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
            = new ChunkProcessorChunkHandler<>();
    chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
    return chunkProcessorChunkHandler;
}
```

这些配置项中的大多数应该在 Manager 配置中看起来很熟悉。工作人员不需要访问 Spring 批`JobRepository`,也不需要访问实际的作业配置文件。主要的 Bean 兴趣是`chunkProcessorChunkHandler``ChunkProcessorChunkHandler``chunkProcessor`属性接受一个已配置的`SimpleChunkProcessor`,在该属性中,你将提供对你的`ItemWriter`(以及你的`ItemProcessor`)的引用,该引用将在 worker 从 Manager 接收块时在其上运行。

有关更多信息,请参见[远程分块](https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#remoteChunking)的“可伸缩性”章节。

从版本 4.1 开始, Spring 批处理集成引入了`@EnableBatchIntegration`注释,该注释可用于简化远程分块设置。这个注释提供了两个可以在应用程序上下文中自动连接的 bean:

* `RemoteChunkingManagerStepBuilderFactory`:用于配置 Manager 步骤

* `RemoteChunkingWorkerBuilder`:用于配置远程工作者集成流

这些 API 负责配置一些组件,如下图所示:

![远程组块配置](https://docs.spring.io/spring-batch/docs/current/reference/html/images/remote-chunking-config.png)

图 4。远程组块配置

在 Manager 方面,`RemoteChunkingManagerStepBuilderFactory`允许你通过声明以下内容来配置 Manager:

* 项目阅读器读取项目并将其发送给工人

* 将请求发送给工作人员的输出通道(“传出请求”)

* 接收工作人员回复的输入通道(“传入回复”)

a`ChunkMessageChannelItemWriter``MessagingTemplate`不需要显式配置(如果需要,仍然可以显式配置这些参数)。

在 worker 方面,`RemoteChunkingWorkerBuilder`允许你将 worker 配置为:

* 监听 Manager 在输入通道上发送的请求(“传入请求”)

* 对于配置了`ItemProcessor``ItemWriter`的每个请求,调用`handleChunk``ChunkProcessorChunkHandler`方法

* 将输出通道上的回复(“输出回复”)发送给 Manager

不需要显式地配置`SimpleChunkProcessor``ChunkProcessorChunkHandler`(如果需要,可以显式地配置这些参数)。

下面的示例展示了如何使用这些 API:

```
@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {

    @Configuration
    public static class ManagerConfiguration {

        @Autowired
        private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;

        @Bean
        public TaskletStep managerStep() {
            return this.managerStepBuilderFactory.get("managerStep")
                       .chunk(100)
                       .reader(itemReader())
                       .outputChannel(requests()) // requests sent to workers
                       .inputChannel(replies())   // replies received from workers
                       .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemoteChunkingWorkerBuilder workerBuilder;

        @Bean
        public IntegrationFlow workerFlow() {
            return this.workerBuilder
                       .itemProcessor(itemProcessor())
                       .itemWriter(itemWriter())
                       .inputChannel(requests()) // requests received from the manager
                       .outputChannel(replies()) // replies sent to the manager
                       .build();
        }

        // Middleware beans setup omitted

    }

}
```

你可以找到远程分块作业[here](https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples#remote-chunking-sample)的完整示例。

##### 远程分区

![远程分区](https://docs.spring.io/spring-batch/docs/current/reference/html/images/remote-partitioning.png)

图 5。远程分区

另一方面,当导致瓶颈的不是项目的处理,而是相关的 I/O 时,远程分区是有用的。使用远程分区,可以将工作分配给执行完整 Spring 批处理步骤的工作人员。因此,每个工作者都有自己的`ItemReader``ItemProcessor``ItemWriter`。为此, Spring 批处理集成提供了`MessageChannelPartitionHandler`

这个`PartitionHandler`接口的实现使用`MessageChannel`实例向远程工作者发送指令并接收他们的响应。这从用于与远程工作者通信的传输(例如 JMS 和 AMQP)中提供了一个很好的抽象。

“可伸缩性”章节中涉及[远程分区](scalability.html#partitioning)的部分概述了配置远程分区所需的概念和组件,并展示了使用默认`TaskExecutorPartitionHandler`在单独的本地执行线程中进行分区的示例。要对多个 JVM 进行远程分区,还需要另外两个组件:

* 一种远程的织物或网格环境

* 支持所需的远程架构或网格环境的`PartitionHandler`实现

与远程组块类似,JMS 可以用作“远程组块结构”。在这种情况下,使用`MessageChannelPartitionHandler`实例作为`PartitionHandler`实现,如前面所述。

下面的示例假定存在一个分区作业,并重点关注 XML 中的`MessageChannelPartitionHandler`和 JMS 配置:

XML 配置

```
<bean id="partitionHandler"
   class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
  <property name="stepName" value="step1"/>
  <property name="gridSize" value="3"/>
  <property name="replyChannel" ref="outbound-replies"/>
  <property name="messagingOperations">
    <bean class="org.springframework.integration.core.MessagingTemplate">
      <property name="defaultChannel" ref="outbound-requests"/>
      <property name="receiveTimeout" value="100000"/>
    </bean>
  </property>
</bean>

<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
    channel="outbound-requests"/>

<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
    channel="inbound-requests"/>

<bean id="stepExecutionRequestHandler"
    class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
  <property name="jobExplorer" ref="jobExplorer"/>
  <property name="stepLocator" ref="stepLocator"/>
</bean>

<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
    output-channel="outbound-staging"/>

<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
    channel="outbound-staging"/>

<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
    channel="inbound-staging"/>

<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
    output-channel="outbound-replies"/>

<int:channel id="outbound-replies">
  <int:queue/>
</int:channel>

<bean id="stepLocator"
    class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />
```

下面的示例假定存在一个分区作业,并重点关注 Java 中的`MessageChannelPartitionHandler`和 JMS 配置:

Java 配置

```
/*
 * Configuration of the manager side
 */
@Bean
public PartitionHandler partitionHandler() {
    MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
    partitionHandler.setStepName("step1");
    partitionHandler.setGridSize(3);
    partitionHandler.setReplyChannel(outboundReplies());
    MessagingTemplate template = new MessagingTemplate();
    template.setDefaultChannel(outboundRequests());
    template.setReceiveTimeout(100000);
    partitionHandler.setMessagingOperations(template);
    return partitionHandler;
}

@Bean
public QueueChannel outboundReplies() {
    return new QueueChannel();
}

@Bean
public DirectChannel outboundRequests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsRequests() {
    return IntegrationFlows.from("outboundRequests")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("requestsQueue"))
            .get();
}

@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
    AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
    aggregatorFactoryBean.setProcessorBean(partitionHandler());
    aggregatorFactoryBean.setOutputChannel(outboundReplies());
    // configure other propeties of the aggregatorFactoryBean
    return aggregatorFactoryBean;
}

@Bean
public DirectChannel inboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundJmsStaging() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("stagingQueue"))
            .channel(inboundStaging())
            .get();
}

/*
 * Configuration of the worker side
 */
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
    StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
    stepExecutionRequestHandler.setJobExplorer(jobExplorer);
    stepExecutionRequestHandler.setStepLocator(stepLocator());
    return stepExecutionRequestHandler;
}

@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
    return stepExecutionRequestHandler();
}

@Bean
public DirectChannel inboundRequests() {
    return new DirectChannel();
}

public IntegrationFlow inboundJmsRequests() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("requestsQueue"))
            .channel(inboundRequests())
            .get();
}

@Bean
public DirectChannel outboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsStaging() {
    return IntegrationFlows.from("outboundStaging")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("stagingQueue"))
            .get();
}
```

还必须确保分区`handler`属性映射到`partitionHandler` Bean。

下面的示例将分区`handler`属性映射到 XML 中的`partitionHandler`:

XML 配置

```
<job id="personJob">
  <step id="step1.manager">
    <partition partitioner="partitioner" handler="partitionHandler"/>
    ...
  </step>
</job>
```

下面的示例将分区`handler`属性映射到 Java 中的`partitionHandler`:

Java 配置

```
	public Job personJob() {
		return jobBuilderFactory.get("personJob")
				.start(stepBuilderFactory.get("step1.manager")
						.partitioner("step1.worker", partitioner())
						.partitionHandler(partitionHandler())
						.build())
				.build();
	}
```

你可以找到远程分区作业[here](https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples#remote-partitioning-sample)的完整示例。

可以用来简化远程分区设置的`@EnableBatchIntegration`注释。这个注释为远程分区提供了两个有用的 bean:

* `RemotePartitioningManagerStepBuilderFactory`:用于配置 Manager 步骤

* `RemotePartitioningWorkerStepBuilderFactory`:用于配置工作步骤

这些 API 负责配置一些组件,如下图所示:

![远程分区配置(使用作业存储库轮询)](https://docs.spring.io/spring-batch/docs/current/reference/html/images/remote-partitioning-polling-config.png)

图 6。远程分区配置(使用作业存储库轮询)

![远程分区配置(带有回复聚合)](https://docs.spring.io/spring-batch/docs/current/reference/html/images/remote-partitioning-aggregation-config.png)

图 7。远程分区配置(带有回复聚合)

在 Manager 方面,`RemotePartitioningManagerStepBuilderFactory`允许你通过声明以下内容来配置 Manager:

* 用于划分数据的`Partitioner`

* 将请求发送给工作人员的输出通道(“传出请求”)

* 输入通道(“传入回复”)以接收来自工作人员的回复(在配置回复聚合时)

* 轮询间隔和超时参数(在配置作业存储库轮询时)

不需要显式配置`MessageChannelPartitionHandler``MessagingTemplate`(如果需要,仍然可以显式配置这些参数)。

在 worker 方面,`RemotePartitioningWorkerStepBuilderFactory`允许你将 worker 配置为:

* 监听 Manager 在输入通道上发送的请求(“传入请求”)

* 对于每个请求调用`StepExecutionRequestHandler``handle`方法

* 将输出通道上的回复(“输出回复”)发送给 Manager

不需要显式配置`StepExecutionRequestHandler`(如果需要,可以显式配置)。

下面的示例展示了如何使用这些 API:

```
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {

    @Configuration
    public static class ManagerConfiguration {

        @Autowired
        private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;

        @Bean
        public Step managerStep() {
                 return this.managerStepBuilderFactory
                    .get("managerStep")
                    .partitioner("workerStep", partitioner())
                    .gridSize(10)
                    .outputChannel(outgoingRequestsToWorkers())
                    .inputChannel(incomingRepliesFromWorkers())
                    .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

        @Bean
        public Step workerStep() {
                 return this.workerStepBuilderFactory
                    .get("workerStep")
                    .inputChannel(incomingRequestsFromManager())
                    .outputChannel(outgoingRepliesToManager())
                    .chunk(100)
                    .reader(itemReader())
                    .processor(itemProcessor())
                    .writer(itemWriter())
                    .build();
        }

        // Middleware beans setup omitted

    }

}
```