(window.webpackJsonp=window.webpackJsonp||[]).push([[388],{821:function(e,n,t){"use strict";t.r(n);var a=t(56),r=Object(a.a)({},(function(){var e=this,n=e.$createElement,t=e._self._c||n;return t("ContentSlotsDistributor",{attrs:{"slot-key":e.$parent.slotKey}},[t("h1",{attrs:{id:"spring-批处理集成"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#spring-批处理集成"}},[e._v("#")]),e._v(" Spring 批处理集成")]),e._v(" "),t("h2",{attrs:{id:"spring-批处理集成-2"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#spring-批处理集成-2"}},[e._v("#")]),e._v(" Spring 批处理集成")]),e._v(" "),t("p",[e._v("XMLJavaBoth")]),e._v(" "),t("h3",{attrs:{id:"spring-批处理集成介绍"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#spring-批处理集成介绍"}},[e._v("#")]),e._v(" Spring 批处理集成介绍")]),e._v(" "),t("p",[e._v("Spring 批处理的许多用户可能会遇到不在 Spring 批处理范围内的需求,但这些需求可以通过使用 Spring 集成来高效而简洁地实现。相反, Spring 集成用户可能会遇到 Spring 批处理需求,并且需要一种有效地集成这两个框架的方法。在这种情况下,出现了几种模式和用例, Spring 批处理集成解决了这些需求。")]),e._v(" "),t("p",[e._v("Spring 批处理和 Spring 集成之间的界限并不总是清晰的,但有两条建议可以提供帮助:考虑粒度,并应用公共模式。这些常见模式中的一些在本参考手册一节中进行了描述。")]),e._v(" "),t("p",[e._v("将消息传递添加到批处理过程中,可以实现操作的自动化,还可以分离关键关注事项并制定策略。例如,消息可能会触发作业执行,然后消息的发送可以通过多种方式公开。或者,当作业完成或失败时,该事件可能会触发要发送的消息,而这些消息的消费者可能有与应用程序本身无关的操作问题。消息传递也可以嵌入到作业中(例如,通过通道读取或写入用于处理的项)。远程分区和远程分块提供了在多个工作人员上分配工作负载的方法。")]),e._v(" "),t("p",[e._v("本节涵盖以下关键概念:")]),e._v(" "),t("ul",[t("li",[t("p",[t("a",{attrs:{href:"#namespace-support"}},[e._v("命名空间支持")])])]),e._v(" "),t("li",[t("p",[t("a",{attrs:{href:"#launching-batch-jobs-through-messages"}},[e._v("通过消息启动批处理作业")])])]),e._v(" "),t("li",[t("p",[t("a",{attrs:{href:"#providing-feedback-with-informational-messages"}},[e._v("提供反馈信息")])])]),e._v(" "),t("li",[t("p",[t("a",{attrs:{href:"#asynchronous-processors"}},[e._v("异步处理器")])])]),e._v(" "),t("li",[t("p",[t("a",{attrs:{href:"#externalizing-batch-process-execution"}},[e._v("外部化批处理过程执行")])])])]),e._v(" "),t("h4",{attrs:{id:"名称空间支持"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#名称空间支持"}},[e._v("#")]),e._v(" 名称空间支持")]),e._v(" "),t("p",[e._v("Spring 自批处理集成 1.3 以来,添加了专用的 XML 命名空间支持,目的是提供更简单的配置体验。为了激活命名空间,请将以下命名空间声明添加到 Spring XML 应用程序上下文文件中:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n\n ...\n\n\n')])])]),t("p",[e._v("用于 Spring 批处理集成的完全配置的 Spring XML 应用程序上下文文件可能如下所示:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n\n ...\n\n\n')])])]),t("p",[e._v("也允许将版本号附加到引用的 XSD 文件中,但是,由于无版本声明总是使用最新的模式,因此我们通常不建议将版本号附加到 XSD 名称中。添加版本号可能会在更新 Spring 批处理集成依赖项时产生问题,因为它们可能需要 XML 模式的最新版本。")]),e._v(" "),t("h4",{attrs:{id:"通过消息启动批处理作业"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#通过消息启动批处理作业"}},[e._v("#")]),e._v(" 通过消息启动批处理作业")]),e._v(" "),t("p",[e._v("当通过使用核心 Spring 批处理 API 启动批处理作业时,你基本上有两个选项:")]),e._v(" "),t("ul",[t("li",[t("p",[e._v("在命令行中,使用"),t("code",[e._v("CommandLineJobRunner")])])]),e._v(" "),t("li",[t("p",[e._v("在编程上,使用"),t("code",[e._v("JobOperator.start()")]),e._v("或"),t("code",[e._v("JobLauncher.run()")])])])]),e._v(" "),t("p",[e._v("例如,当通过使用 shell 脚本调用批处理作业时,你可能希望使用"),t("code",[e._v("CommandLineJobRunner")]),e._v("。或者,你可以直接使用"),t("code",[e._v("JobOperator")]),e._v("(例如,当使用 Spring 批处理作为 Web 应用程序的一部分时)。然而,更复杂的用例呢?也许你需要轮询远程 FTP 服务器来检索批处理作业的数据,或者你的应用程序必须同时支持多个不同的数据源。例如,你不仅可以从 Web 接收数据文件,还可以从 FTP 和其他来源接收数据文件。在调用 Spring 批处理之前,可能需要对输入文件进行额外的转换。")]),e._v(" "),t("p",[e._v("因此,使用 Spring 集成及其众多适配器来执行批处理作业将会强大得多。例如,你可以使用"),t("em",[e._v("文件入站通道适配器")]),e._v("来监视文件系统中的一个目录,并在输入文件到达时立即启动批处理作业。此外,你还可以创建 Spring 集成流,这些集成流使用多个不同的适配器,仅使用配置就可以轻松地从多个源同时为批处理作业摄取数据。使用 Spring 集成实现所有这些场景是很容易的,因为它允许对"),t("code",[e._v("JobLauncher")]),e._v("进行解耦、事件驱动的执行。")]),e._v(" "),t("p",[e._v("Spring 批处理集成提供了"),t("code",[e._v("JobLaunchingMessageHandler")]),e._v("类,你可以使用它来启动批处理作业。"),t("code",[e._v("JobLaunchingMessageHandler")]),e._v("的输入由 Spring 集成消息提供,该消息的有效负载类型为"),t("code",[e._v("JobLaunchRequest")]),e._v("。这个类是围绕需要启动的"),t("code",[e._v("Job")]),e._v("和启动批处理作业所必需的"),t("code",[e._v("JobParameters")]),e._v("的包装器。")]),e._v(" "),t("p",[e._v("下面的图像演示了典型的 Spring 集成消息流,以便启动批处理作业。"),t("a",{attrs:{href:"https://www.enterpriseintegrationpatterns.com/toc.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("Enterprise 集成模式网站"),t("OutboundLink")],1),e._v("提供了消息传递图标及其描述的完整概述。")]),e._v(" "),t("p",[t("img",{attrs:{src:"https://docs.spring.io/spring-batch/docs/current/reference/html/images/launch-batch-job.png",alt:"启动批处理作业"}})]),e._v(" "),t("p",[e._v("图 1。启动批处理作业")]),e._v(" "),t("h5",{attrs:{id:"将文件转换为-joblaunchrequest"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#将文件转换为-joblaunchrequest"}},[e._v("#")]),e._v(" 将文件转换为 joblaunchrequest")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("package io.spring.sbi;\n\nimport org.springframework.batch.core.Job;\nimport org.springframework.batch.core.JobParametersBuilder;\nimport org.springframework.batch.integration.launch.JobLaunchRequest;\nimport org.springframework.integration.annotation.Transformer;\nimport org.springframework.messaging.Message;\n\nimport java.io.File;\n\npublic class FileMessageToJobRequest {\n private Job job;\n private String fileParameterName;\n\n public void setFileParameterName(String fileParameterName) {\n this.fileParameterName = fileParameterName;\n }\n\n public void setJob(Job job) {\n this.job = job;\n }\n\n @Transformer\n public JobLaunchRequest toRequest(Message message) {\n JobParametersBuilder jobParametersBuilder =\n new JobParametersBuilder();\n\n jobParametersBuilder.addString(fileParameterName,\n message.getPayload().getAbsolutePath());\n\n return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());\n }\n}\n")])])]),t("h5",{attrs:{id:"thejobexecution响应"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#thejobexecution响应"}},[e._v("#")]),e._v(" the"),t("code",[e._v("JobExecution")]),e._v("响应")]),e._v(" "),t("p",[e._v("当执行批处理作业时,将返回一个"),t("code",[e._v("JobExecution")]),e._v("实例。此实例可用于确定执行的状态。如果"),t("code",[e._v("JobExecution")]),e._v("能够成功创建,则无论实际执行是否成功,它总是被返回。")]),e._v(" "),t("p",[e._v("如何返回"),t("code",[e._v("JobExecution")]),e._v("实例的确切行为取决于所提供的"),t("code",[e._v("TaskExecutor")]),e._v("。如果使用"),t("code",[e._v("synchronous")]),e._v("(单线程)"),t("code",[e._v("TaskExecutor")]),e._v("实现,则只返回"),t("code",[e._v("JobExecution")]),e._v("响应"),t("code",[e._v("after")]),e._v("作业完成。当使用"),t("code",[e._v("asynchronous``TaskExecutor")]),e._v("时,将立即返回"),t("code",[e._v("JobExecution")]),e._v("实例。然后,用户可以使用"),t("code",[e._v("JobExecution")]),e._v("的"),t("code",[e._v("id")]),e._v("实例(带有"),t("code",[e._v("JobExecution.getJobId()")]),e._v("),并使用"),t("code",[e._v("JobExplorer")]),e._v("查询"),t("code",[e._v("JobRepository")]),e._v("中的作业更新状态。有关更多信息,请参阅关于"),t("RouterLink",{attrs:{to:"/spring-batch/job.html#queryingRepository"}},[e._v("查询存储库")]),e._v("的 Spring 批参考文档。")],1),e._v(" "),t("h5",{attrs:{id:"spring-批处理集成配置"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#spring-批处理集成配置"}},[e._v("#")]),e._v(" Spring 批处理集成配置")]),e._v(" "),t("p",[e._v("考虑这样一种情况:需要创建一个文件"),t("code",[e._v("inbound-channel-adapter")]),e._v("来监听所提供的目录中的 CSV 文件,将它们交给转换器("),t("code",[e._v("FileMessageToJobRequest")]),e._v("),通过"),t("em",[e._v("工作启动网关")]),e._v("启动作业,然后用"),t("code",[e._v("logging-channel-adapter")]),e._v("记录"),t("code",[e._v("JobExecution")]),e._v("的输出。")]),e._v(" "),t("p",[e._v("下面的示例展示了如何在 XML 中配置这种常见的情况:")]),e._v(" "),t("p",[e._v("XML 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n\n\n\n\n \n\n\n\n \n \n \n \n\n\n\n\n\n')])])]),t("p",[e._v("下面的示例展示了如何在 Java 中配置这种常见情况:")]),e._v(" "),t("p",[e._v("Java 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('@Bean\npublic FileMessageToJobRequest fileMessageToJobRequest() {\n FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();\n fileMessageToJobRequest.setFileParameterName("input.file.name");\n fileMessageToJobRequest.setJob(personJob());\n return fileMessageToJobRequest;\n}\n\n@Bean\npublic JobLaunchingGateway jobLaunchingGateway() {\n SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();\n simpleJobLauncher.setJobRepository(jobRepository);\n simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());\n JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);\n\n return jobLaunchingGateway;\n}\n\n@Bean\npublic IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {\n return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/myfiles")).\n filter(new SimplePatternFileListFilter("*.csv")),\n c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).\n transform(fileMessageToJobRequest()).\n handle(jobLaunchingGateway).\n log(LoggingHandler.Level.WARN, "headers.id + \': \' + payload").\n get();\n}\n')])])]),t("h5",{attrs:{id:"示例-itemreader-配置"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#示例-itemreader-配置"}},[e._v("#")]),e._v(" 示例 itemreader 配置")]),e._v(" "),t("p",[e._v("现在我们正在轮询文件和启动作业,我们需要配置我们的 Spring 批处理"),t("code",[e._v("ItemReader")]),e._v("(例如),以使用在名为“input.file.name”的作业参数所定义的位置找到的文件,如下面的 Bean 配置所示:")]),e._v(" "),t("p",[e._v("下面的 XML 示例展示了必要的配置 Bean:")]),e._v(" "),t("p",[e._v("XML 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n ...\n\n')])])]),t("p",[e._v("下面的 Java 示例展示了必要的配置 Bean:")]),e._v(" "),t("p",[e._v("Java 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('@Bean\n@StepScope\npublic ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {\n...\n FlatFileItemReader flatFileItemReader = new FlatFileItemReader();\n flatFileItemReader.setResource(new FileSystemResource(resource));\n...\n return flatFileItemReader;\n}\n')])])]),t("p",[e._v("在前面的示例中,主要的关注点是注入"),t("code",[e._v("#{jobParameters['input.file.name']}")]),e._v("的值作为资源属性值,并将"),t("code",[e._v("ItemReader")]),e._v(" Bean 设置为具有"),t("em",[e._v("步骤范围")]),e._v("。将 Bean 设置为具有步骤作用域利用了后期绑定支持,这允许访问"),t("code",[e._v("jobParameters")]),e._v("变量。")]),e._v(" "),t("h3",{attrs:{id:"作业启动网关的可用属性"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#作业启动网关的可用属性"}},[e._v("#")]),e._v(" 作业启动网关的可用属性")]),e._v(" "),t("p",[e._v("作业启动网关具有以下属性,你可以设置这些属性来控制作业:")]),e._v(" "),t("ul",[t("li",[t("p",[t("code",[e._v("id")]),e._v(":标识底层的 Spring Bean 定义,它是以下两种定义之一的实例:")]),e._v(" "),t("ul",[t("li",[t("p",[t("code",[e._v("EventDrivenConsumer")])])]),e._v(" "),t("li",[t("p",[t("code",[e._v("PollingConsumer")]),e._v("(准确的实现取决于组件的输入通道是"),t("code",[e._v("SubscribableChannel")]),e._v("还是"),t("code",[e._v("PollableChannel")]),e._v("。)")])])])]),e._v(" "),t("li",[t("p",[t("code",[e._v("auto-startup")]),e._v(":布尔标志,指示端点在启动时应自动启动。默认值为"),t("em",[e._v("true")]),e._v("。")])]),e._v(" "),t("li",[t("p",[t("code",[e._v("request-channel")]),e._v(":此端点的输入"),t("code",[e._v("MessageChannel")]),e._v("。")])]),e._v(" "),t("li",[t("p",[t("code",[e._v("reply-channel")]),e._v(":"),t("code",[e._v("MessageChannel")]),e._v("将结果"),t("code",[e._v("JobExecution")]),e._v("的有效载荷发送到该负载。")])]),e._v(" "),t("li",[t("p",[t("code",[e._v("reply-timeout")]),e._v(":允许你指定此网关在抛出异常之前等待多长时间(以毫秒为单位)以将答复消息成功发送到答复通道。此属性仅在通道可能阻塞时才应用(例如,当使用当前已满的有界队列通道时)。另外,请记住,当发送到"),t("code",[e._v("DirectChannel")]),e._v("时,调用发生在发送方的线程中。因此,发送操作的失败可能是由更下游的其他组件引起的。"),t("code",[e._v("reply-timeout")]),e._v("属性映射到底层"),t("code",[e._v("sendTimeout")]),e._v("实例的"),t("code",[e._v("sendTimeout")]),e._v("属性。如果没有指定,则属性默认为 -1,这意味着,默认情况下,"),t("code",[e._v("Gateway")]),e._v("无限期地等待。")])]),e._v(" "),t("li",[t("p",[t("code",[e._v("job-launcher")]),e._v(":可选。接受自定义"),t("code",[e._v("JobLauncher")]),e._v(" Bean 引用。如果没有指定适配器,则重新使用在"),t("code",[e._v("jobLauncher")]),e._v("的"),t("code",[e._v("id")]),e._v("下注册的实例。如果不存在缺省实例,则抛出一个异常。")])]),e._v(" "),t("li",[t("p",[t("code",[e._v("order")]),e._v(":指定当此端点作为订阅服务器连接到"),t("code",[e._v("SubscribableChannel")]),e._v("时的调用顺序。")])])]),e._v(" "),t("h3",{attrs:{id:"子元素"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#子元素"}},[e._v("#")]),e._v(" 子元素")]),e._v(" "),t("p",[e._v("当"),t("code",[e._v("Gateway")]),e._v("接收来自"),t("code",[e._v("PollableChannel")]),e._v("的消息时,你必须为"),t("code",[e._v("Poller")]),e._v("提供一个全局默认值"),t("code",[e._v("Poller")]),e._v(",或者为"),t("code",[e._v("Job Launching Gateway")]),e._v("提供一个子元素。")]),e._v(" "),t("p",[e._v("下面的示例展示了如何用 XML 提供一个 Poller:")]),e._v(" "),t("p",[e._v("XML 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n\n')])])]),t("p",[e._v("下面的示例展示了如何用 Java 提供一个 Poller:")]),e._v(" "),t("p",[e._v("Java 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('@Bean\n@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))\npublic JobLaunchingGateway sampleJobLaunchingGateway() {\n JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());\n jobLaunchingGateway.setOutputChannel(replyChannel());\n return jobLaunchingGateway;\n}\n')])])]),t("h4",{attrs:{id:"提供反馈信息"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#提供反馈信息"}},[e._v("#")]),e._v(" 提供反馈信息")]),e._v(" "),t("p",[e._v("Spring 由于批处理作业可以运行很长时间,因此提供进度信息通常是至关重要的。例如,如果批处理作业的某些部分或所有部分都失败了,利益相关者可能希望得到通知。 Spring 批处理为正在通过以下方式收集的此信息提供支持:")]),e._v(" "),t("ul",[t("li",[t("p",[e._v("活动轮询")])]),e._v(" "),t("li",[t("p",[e._v("事件驱动侦听器")])])]),e._v(" "),t("p",[e._v("在异步启动 Spring 批处理作业时(例如,通过使用"),t("code",[e._v("Job Launching Gateway")]),e._v("),将返回一个"),t("code",[e._v("JobExecution")]),e._v("实例。因此,通过使用"),t("code",[e._v("JobExplorer")]),e._v("从"),t("code",[e._v("JobRepository")]),e._v("检索"),t("code",[e._v("JobExecution")]),e._v("的更新实例,"),t("code",[e._v("JobExecution.getJobId()")]),e._v("可用于连续轮询状态更新。然而,这被认为是次优的,事件驱动的方法应该是首选的。")]),e._v(" "),t("p",[e._v("因此, Spring 批提供了侦听器,包括三个最常用的侦听器:")]),e._v(" "),t("ul",[t("li",[t("p",[t("code",[e._v("StepListener")])])]),e._v(" "),t("li",[t("p",[t("code",[e._v("ChunkListener")])])]),e._v(" "),t("li",[t("p",[t("code",[e._v("JobExecutionListener")])])])]),e._v(" "),t("p",[e._v("在下图所示的示例中, Spring 批处理作业已配置为"),t("code",[e._v("StepExecutionListener")]),e._v("。因此, Spring 集成接收并处理事件之前或之后的任何步骤。例如,接收到的"),t("code",[e._v("StepExecution")]),e._v("可以通过使用"),t("code",[e._v("Router")]),e._v("进行检查。基于该检查的结果,可以发生各种事情(例如将消息路由到邮件出站通道适配器),以便可以基于某些条件发送出电子邮件通知。")]),e._v(" "),t("p",[t("img",{attrs:{src:"https://docs.spring.io/spring-batch/docs/current/reference/html/images/handling-informational-messages.png",alt:"处理信息消息"}})]),e._v(" "),t("p",[e._v("图 2。处理信息消息")]),e._v(" "),t("p",[e._v("下面由两部分组成的示例展示了侦听器如何配置为向"),t("code",[e._v("Gateway")]),e._v("事件发送消息到"),t("code",[e._v("StepExecution")]),e._v(",并将其输出记录到"),t("code",[e._v("logging-channel-adapter")]),e._v("。")]),e._v(" "),t("p",[e._v("首先,创建通知集成 bean。")]),e._v(" "),t("p",[e._v("下面的示例展示了如何在 XML 中创建通知集成 bean:")]),e._v(" "),t("p",[e._v("XML 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n\n\n\n\n')])])]),t("p",[e._v("下面的示例展示了如何在 Java 中创建通知集成 bean:")]),e._v(" "),t("p",[e._v("Java 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('@Bean\n@ServiceActivator(inputChannel = "stepExecutionsChannel")\npublic LoggingHandler loggingHandler() {\n LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);\n adapter.setLoggerName("TEST_LOGGER");\n adapter.setLogExpressionString("headers.id + \': \' + payload");\n return adapter;\n}\n\n@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")\npublic interface NotificationExecutionListener extends StepExecutionListener {}\n')])])]),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("你需要将"),t("code",[e._v("@IntegrationComponentScan")]),e._v("注释添加到配置中。")])])]),e._v(" "),t("tbody")]),e._v(" "),t("p",[e._v("其次,修改工作以添加一个步骤级侦听器。")]),e._v(" "),t("p",[e._v("下面的示例展示了如何在 XML 中添加一个步骤级侦听器:")]),e._v(" "),t("p",[e._v("XML 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n \n \n \n \n \n \n ...\n \n\n')])])]),t("p",[e._v("下面的示例展示了如何在 Java 中添加一个步骤级侦听器:")]),e._v(" "),t("p",[e._v("Java 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('public Job importPaymentsJob() {\n return jobBuilderFactory.get("importPayments")\n .start(stepBuilderFactory.get("step1")\n .chunk(200)\n .listener(notificationExecutionsListener())\n ...\n}\n')])])]),t("h4",{attrs:{id:"异步处理器"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#异步处理器"}},[e._v("#")]),e._v(" 异步处理器")]),e._v(" "),t("p",[e._v("异步处理器帮助你扩展项目的处理。在异步处理器用例中,"),t("code",[e._v("AsyncItemProcessor")]),e._v("充当调度器,为新线程上的项执行"),t("code",[e._v("ItemProcessor")]),e._v("的逻辑。项目完成后,将"),t("code",[e._v("Future")]),e._v("传递给要写入的"),t("code",[e._v("AsynchItemWriter")]),e._v("。")]),e._v(" "),t("p",[e._v("因此,你可以通过使用异步项目处理来提高性能,基本上允许你实现*fork-join *场景。"),t("code",[e._v("AsyncItemWriter")]),e._v("收集结果,并在所有结果可用时立即写回块。")]),e._v(" "),t("p",[e._v("下面的示例展示了如何在 XML 中配置"),t("code",[e._v("AsyncItemProcessor")]),e._v(":")]),e._v(" "),t("p",[e._v("XML 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n \n \n \n \n \n\n')])])]),t("p",[e._v("下面的示例展示了如何在 XML 中配置"),t("code",[e._v("AsyncItemProcessor")]),e._v(":")]),e._v(" "),t("p",[e._v("Java 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("@Bean\npublic AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {\n AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();\n asyncItemProcessor.setTaskExecutor(taskExecutor);\n asyncItemProcessor.setDelegate(itemProcessor);\n return asyncItemProcessor;\n}\n")])])]),t("p",[t("code",[e._v("delegate")]),e._v("属性是指你的"),t("code",[e._v("ItemProcessor")]),e._v(" Bean,而"),t("code",[e._v("taskExecutor")]),e._v("属性是指你选择的"),t("code",[e._v("TaskExecutor")]),e._v("。")]),e._v(" "),t("p",[e._v("下面的示例展示了如何在 XML 中配置"),t("code",[e._v("AsyncItemWriter")]),e._v(":")]),e._v(" "),t("p",[e._v("XML 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n \n \n\n')])])]),t("p",[e._v("下面的示例展示了如何在 Java 中配置"),t("code",[e._v("AsyncItemWriter")]),e._v(":")]),e._v(" "),t("p",[e._v("Java 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("@Bean\npublic AsyncItemWriter writer(ItemWriter itemWriter) {\n AsyncItemWriter asyncItemWriter = new AsyncItemWriter();\n asyncItemWriter.setDelegate(itemWriter);\n return asyncItemWriter;\n}\n")])])]),t("p",[e._v("同样,"),t("code",[e._v("delegate")]),e._v("属性实际上是对你的"),t("code",[e._v("ItemWriter")]),e._v(" Bean 的引用。")]),e._v(" "),t("h4",{attrs:{id:"外部化批处理过程执行"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#外部化批处理过程执行"}},[e._v("#")]),e._v(" 外部化批处理过程执行")]),e._v(" "),t("p",[e._v("到目前为止讨论的集成方法建议使用 Spring 集成像外壳一样包装 Spring 批处理的用例。然而, Spring 批处理也可以在内部使用 Spring 集成。 Spring 使用这种方法,批处理用户可以将项目甚至块的处理委托给外部进程。这允许你卸载复杂的处理。 Spring 批处理集成为以下方面提供了专门的支持:")]),e._v(" "),t("ul",[t("li",[t("p",[e._v("远程分块")])]),e._v(" "),t("li",[t("p",[e._v("远程分区")])])]),e._v(" "),t("h5",{attrs:{id:"远程分块"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#远程分块"}},[e._v("#")]),e._v(" 远程分块")]),e._v(" "),t("p",[t("img",{attrs:{src:"https://docs.spring.io/spring-batch/docs/current/reference/html/images/remote-chunking-sbi.png",alt:"远程分块"}})]),e._v(" "),t("p",[e._v("图 3。远程分块")]),e._v(" "),t("p",[e._v("更进一步,还可以使用"),t("code",[e._v("ChunkMessageChannelItemWriter")]),e._v("(由 Spring Batch Integration 提供)将块处理外部化,它将项发送出去并收集结果。一旦发送, Spring 批处理继续读取和分组项的过程,而无需等待结果。相反,收集结果并将其集成回 Spring 批处理过程是"),t("code",[e._v("ChunkMessageChannelItemWriter")]),e._v("的责任。")]),e._v(" "),t("p",[e._v("通过 Spring 集成,你可以完全控制进程的并发性(例如,通过使用"),t("code",[e._v("QueueChannel")]),e._v("而不是"),t("code",[e._v("DirectChannel")]),e._v(")。此外,通过依赖 Spring Integration 的通道适配器(例如 JMS 和 AMQP)的丰富集合,你可以将批处理作业的块分配给外部系统进行处理。")]),e._v(" "),t("p",[e._v("带有要远程分块的步骤的作业可能具有类似于 XML 中的以下配置:")]),e._v(" "),t("p",[e._v("XML 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n \n \n \n ...\n \n\n')])])]),t("p",[e._v("带有要远程分块的步骤的作业可能具有类似于 Java 中的以下配置:")]),e._v(" "),t("p",[e._v("Java 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('public Job chunkJob() {\n return jobBuilderFactory.get("personJob")\n .start(stepBuilderFactory.get("step1")\n .chunk(200)\n .reader(itemReader())\n .writer(itemWriter())\n .build())\n .build();\n }\n')])])]),t("p",[t("code",[e._v("ItemReader")]),e._v("引用指向要用于读取 Manager 上的数据的 Bean。正如上面所描述的,"),t("code",[e._v("ItemWriter")]),e._v("引用指向一个特殊的"),t("code",[e._v("ItemWriter")]),e._v("(称为"),t("code",[e._v("ChunkMessageChannelItemWriter")]),e._v(")。处理器(如果有的话)不在 Manager 配置中,因为它是在 Worker 上配置的。在实现用例时,你应该检查任何附加的组件属性,例如油门限制等。")]),e._v(" "),t("p",[e._v("以下 XML 配置提供了基本的 Manager 设置:")]),e._v(" "),t("p",[e._v("XML 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n\n\n\n\n\n \n \n\n\n\n \n \n\n\n\n \n\n\n\n')])])]),t("p",[e._v("下面的 Java 配置提供了一个基本的 Manager 设置:")]),e._v(" "),t("p",[e._v("Java 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('@Bean\npublic org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {\n ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();\n factory.setBrokerURL("tcp://localhost:61616");\n return factory;\n}\n\n/*\n * Configure outbound flow (requests going to workers)\n */\n@Bean\npublic DirectChannel requests() {\n return new DirectChannel();\n}\n\n@Bean\npublic IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {\n return IntegrationFlows\n .from(requests())\n .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))\n .get();\n}\n\n/*\n * Configure inbound flow (replies coming from workers)\n */\n@Bean\npublic QueueChannel replies() {\n return new QueueChannel();\n}\n\n@Bean\npublic IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {\n return IntegrationFlows\n .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))\n .channel(replies())\n .get();\n}\n\n/*\n * Configure the ChunkMessageChannelItemWriter\n */\n@Bean\npublic ItemWriter itemWriter() {\n MessagingTemplate messagingTemplate = new MessagingTemplate();\n messagingTemplate.setDefaultChannel(requests());\n messagingTemplate.setReceiveTimeout(2000);\n ChunkMessageChannelItemWriter chunkMessageChannelItemWriter\n = new ChunkMessageChannelItemWriter<>();\n chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);\n chunkMessageChannelItemWriter.setReplyChannel(replies());\n return chunkMessageChannelItemWriter;\n}\n')])])]),t("p",[e._v("前面的配置为我们提供了许多 bean。我们使用 ActiveMQ 和 Spring Integration 提供的入站/出站 JMS 适配器配置消息传递中间件。如图所示,我们的作业步骤引用的"),t("code",[e._v("itemWriter")]),e._v(" Bean 使用"),t("code",[e._v("ChunkMessageChannelItemWriter")]),e._v("在配置的中间件上写块。")]),e._v(" "),t("p",[e._v("现在我们可以转到 Worker 配置,如下面的示例所示:")]),e._v(" "),t("p",[e._v("下面的示例显示了 XML 中的工作配置:")]),e._v(" "),t("p",[e._v("XML 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n\n\n\n\n\n\n\n\n\n\n\n\n\n \n \n \n \n \n \n \n \n \n \n\n')])])]),t("p",[e._v("下面的示例展示了 Java 中的 worker 配置:")]),e._v(" "),t("p",[e._v("Java 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('@Bean\npublic org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {\n ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();\n factory.setBrokerURL("tcp://localhost:61616");\n return factory;\n}\n\n/*\n * Configure inbound flow (requests coming from the manager)\n */\n@Bean\npublic DirectChannel requests() {\n return new DirectChannel();\n}\n\n@Bean\npublic IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {\n return IntegrationFlows\n .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))\n .channel(requests())\n .get();\n}\n\n/*\n * Configure outbound flow (replies going to the manager)\n */\n@Bean\npublic DirectChannel replies() {\n return new DirectChannel();\n}\n\n@Bean\npublic IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {\n return IntegrationFlows\n .from(replies())\n .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))\n .get();\n}\n\n/*\n * Configure the ChunkProcessorChunkHandler\n */\n@Bean\n@ServiceActivator(inputChannel = "requests", outputChannel = "replies")\npublic ChunkProcessorChunkHandler chunkProcessorChunkHandler() {\n ChunkProcessor chunkProcessor\n = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());\n ChunkProcessorChunkHandler chunkProcessorChunkHandler\n = new ChunkProcessorChunkHandler<>();\n chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);\n return chunkProcessorChunkHandler;\n}\n')])])]),t("p",[e._v("这些配置项中的大多数应该在 Manager 配置中看起来很熟悉。工作人员不需要访问 Spring 批"),t("code",[e._v("JobRepository")]),e._v(",也不需要访问实际的作业配置文件。主要的 Bean 兴趣是"),t("code",[e._v("chunkProcessorChunkHandler")]),e._v("。"),t("code",[e._v("ChunkProcessorChunkHandler")]),e._v("的"),t("code",[e._v("chunkProcessor")]),e._v("属性接受一个已配置的"),t("code",[e._v("SimpleChunkProcessor")]),e._v(",在该属性中,你将提供对你的"),t("code",[e._v("ItemWriter")]),e._v("(以及你的"),t("code",[e._v("ItemProcessor")]),e._v(")的引用,该引用将在 worker 从 Manager 接收块时在其上运行。")]),e._v(" "),t("p",[e._v("有关更多信息,请参见"),t("a",{attrs:{href:"https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#remoteChunking",target:"_blank",rel:"noopener noreferrer"}},[e._v("远程分块"),t("OutboundLink")],1),e._v("的“可伸缩性”章节。")]),e._v(" "),t("p",[e._v("从版本 4.1 开始, Spring 批处理集成引入了"),t("code",[e._v("@EnableBatchIntegration")]),e._v("注释,该注释可用于简化远程分块设置。这个注释提供了两个可以在应用程序上下文中自动连接的 bean:")]),e._v(" "),t("ul",[t("li",[t("p",[t("code",[e._v("RemoteChunkingManagerStepBuilderFactory")]),e._v(":用于配置 Manager 步骤")])]),e._v(" "),t("li",[t("p",[t("code",[e._v("RemoteChunkingWorkerBuilder")]),e._v(":用于配置远程工作者集成流")])])]),e._v(" "),t("p",[e._v("这些 API 负责配置一些组件,如下图所示:")]),e._v(" "),t("p",[t("img",{attrs:{src:"https://docs.spring.io/spring-batch/docs/current/reference/html/images/remote-chunking-config.png",alt:"远程组块配置"}})]),e._v(" "),t("p",[e._v("图 4。远程组块配置")]),e._v(" "),t("p",[e._v("在 Manager 方面,"),t("code",[e._v("RemoteChunkingManagerStepBuilderFactory")]),e._v("允许你通过声明以下内容来配置 Manager:")]),e._v(" "),t("ul",[t("li",[t("p",[e._v("项目阅读器读取项目并将其发送给工人")])]),e._v(" "),t("li",[t("p",[e._v("将请求发送给工作人员的输出通道(“传出请求”)")])]),e._v(" "),t("li",[t("p",[e._v("接收工作人员回复的输入通道(“传入回复”)")])])]),e._v(" "),t("p",[e._v("a"),t("code",[e._v("ChunkMessageChannelItemWriter")]),e._v("和"),t("code",[e._v("MessagingTemplate")]),e._v("不需要显式配置(如果需要,仍然可以显式配置这些参数)。")]),e._v(" "),t("p",[e._v("在 worker 方面,"),t("code",[e._v("RemoteChunkingWorkerBuilder")]),e._v("允许你将 worker 配置为:")]),e._v(" "),t("ul",[t("li",[t("p",[e._v("监听 Manager 在输入通道上发送的请求(“传入请求”)")])]),e._v(" "),t("li",[t("p",[e._v("对于配置了"),t("code",[e._v("ItemProcessor")]),e._v("和"),t("code",[e._v("ItemWriter")]),e._v("的每个请求,调用"),t("code",[e._v("handleChunk")]),e._v("的"),t("code",[e._v("ChunkProcessorChunkHandler")]),e._v("方法")])]),e._v(" "),t("li",[t("p",[e._v("将输出通道上的回复(“输出回复”)发送给 Manager")])])]),e._v(" "),t("p",[e._v("不需要显式地配置"),t("code",[e._v("SimpleChunkProcessor")]),e._v("和"),t("code",[e._v("ChunkProcessorChunkHandler")]),e._v("(如果需要,可以显式地配置这些参数)。")]),e._v(" "),t("p",[e._v("下面的示例展示了如何使用这些 API:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('@EnableBatchIntegration\n@EnableBatchProcessing\npublic class RemoteChunkingJobConfiguration {\n\n @Configuration\n public static class ManagerConfiguration {\n\n @Autowired\n private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;\n\n @Bean\n public TaskletStep managerStep() {\n return this.managerStepBuilderFactory.get("managerStep")\n .chunk(100)\n .reader(itemReader())\n .outputChannel(requests()) // requests sent to workers\n .inputChannel(replies()) // replies received from workers\n .build();\n }\n\n // Middleware beans setup omitted\n\n }\n\n @Configuration\n public static class WorkerConfiguration {\n\n @Autowired\n private RemoteChunkingWorkerBuilder workerBuilder;\n\n @Bean\n public IntegrationFlow workerFlow() {\n return this.workerBuilder\n .itemProcessor(itemProcessor())\n .itemWriter(itemWriter())\n .inputChannel(requests()) // requests received from the manager\n .outputChannel(replies()) // replies sent to the manager\n .build();\n }\n\n // Middleware beans setup omitted\n\n }\n\n}\n')])])]),t("p",[e._v("你可以找到远程分块作业"),t("a",{attrs:{href:"https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples#remote-chunking-sample",target:"_blank",rel:"noopener noreferrer"}},[e._v("here"),t("OutboundLink")],1),e._v("的完整示例。")]),e._v(" "),t("h5",{attrs:{id:"远程分区"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#远程分区"}},[e._v("#")]),e._v(" 远程分区")]),e._v(" "),t("p",[t("img",{attrs:{src:"https://docs.spring.io/spring-batch/docs/current/reference/html/images/remote-partitioning.png",alt:"远程分区"}})]),e._v(" "),t("p",[e._v("图 5。远程分区")]),e._v(" "),t("p",[e._v("另一方面,当导致瓶颈的不是项目的处理,而是相关的 I/O 时,远程分区是有用的。使用远程分区,可以将工作分配给执行完整 Spring 批处理步骤的工作人员。因此,每个工作者都有自己的"),t("code",[e._v("ItemReader")]),e._v("、"),t("code",[e._v("ItemProcessor")]),e._v("和"),t("code",[e._v("ItemWriter")]),e._v("。为此, Spring 批处理集成提供了"),t("code",[e._v("MessageChannelPartitionHandler")]),e._v("。")]),e._v(" "),t("p",[e._v("这个"),t("code",[e._v("PartitionHandler")]),e._v("接口的实现使用"),t("code",[e._v("MessageChannel")]),e._v("实例向远程工作者发送指令并接收他们的响应。这从用于与远程工作者通信的传输(例如 JMS 和 AMQP)中提供了一个很好的抽象。")]),e._v(" "),t("p",[e._v("“可伸缩性”章节中涉及"),t("RouterLink",{attrs:{to:"/spring-batch/scalability.html#partitioning"}},[e._v("远程分区")]),e._v("的部分概述了配置远程分区所需的概念和组件,并展示了使用默认"),t("code",[e._v("TaskExecutorPartitionHandler")]),e._v("在单独的本地执行线程中进行分区的示例。要对多个 JVM 进行远程分区,还需要另外两个组件:")],1),e._v(" "),t("ul",[t("li",[t("p",[e._v("一种远程的织物或网格环境")])]),e._v(" "),t("li",[t("p",[e._v("支持所需的远程架构或网格环境的"),t("code",[e._v("PartitionHandler")]),e._v("实现")])])]),e._v(" "),t("p",[e._v("与远程组块类似,JMS 可以用作“远程组块结构”。在这种情况下,使用"),t("code",[e._v("MessageChannelPartitionHandler")]),e._v("实例作为"),t("code",[e._v("PartitionHandler")]),e._v("实现,如前面所述。")]),e._v(" "),t("p",[e._v("下面的示例假定存在一个分区作业,并重点关注 XML 中的"),t("code",[e._v("MessageChannelPartitionHandler")]),e._v("和 JMS 配置:")]),e._v(" "),t("p",[e._v("XML 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n \n \n \n \n \n \n \n \n\n\n\n\n\n\n\n\n\n \n \n\n\n\n\n\n\n\n\n\n\n\n\n\n \n\n\n\n')])])]),t("p",[e._v("下面的示例假定存在一个分区作业,并重点关注 Java 中的"),t("code",[e._v("MessageChannelPartitionHandler")]),e._v("和 JMS 配置:")]),e._v(" "),t("p",[e._v("Java 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('/*\n * Configuration of the manager side\n */\n@Bean\npublic PartitionHandler partitionHandler() {\n MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();\n partitionHandler.setStepName("step1");\n partitionHandler.setGridSize(3);\n partitionHandler.setReplyChannel(outboundReplies());\n MessagingTemplate template = new MessagingTemplate();\n template.setDefaultChannel(outboundRequests());\n template.setReceiveTimeout(100000);\n partitionHandler.setMessagingOperations(template);\n return partitionHandler;\n}\n\n@Bean\npublic QueueChannel outboundReplies() {\n return new QueueChannel();\n}\n\n@Bean\npublic DirectChannel outboundRequests() {\n return new DirectChannel();\n}\n\n@Bean\npublic IntegrationFlow outboundJmsRequests() {\n return IntegrationFlows.from("outboundRequests")\n .handle(Jms.outboundGateway(connectionFactory())\n .requestDestination("requestsQueue"))\n .get();\n}\n\n@Bean\n@ServiceActivator(inputChannel = "inboundStaging")\npublic AggregatorFactoryBean partitioningMessageHandler() throws Exception {\n AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();\n aggregatorFactoryBean.setProcessorBean(partitionHandler());\n aggregatorFactoryBean.setOutputChannel(outboundReplies());\n // configure other propeties of the aggregatorFactoryBean\n return aggregatorFactoryBean;\n}\n\n@Bean\npublic DirectChannel inboundStaging() {\n return new DirectChannel();\n}\n\n@Bean\npublic IntegrationFlow inboundJmsStaging() {\n return IntegrationFlows\n .from(Jms.messageDrivenChannelAdapter(connectionFactory())\n .configureListenerContainer(c -> c.subscriptionDurable(false))\n .destination("stagingQueue"))\n .channel(inboundStaging())\n .get();\n}\n\n/*\n * Configuration of the worker side\n */\n@Bean\npublic StepExecutionRequestHandler stepExecutionRequestHandler() {\n StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();\n stepExecutionRequestHandler.setJobExplorer(jobExplorer);\n stepExecutionRequestHandler.setStepLocator(stepLocator());\n return stepExecutionRequestHandler;\n}\n\n@Bean\n@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")\npublic StepExecutionRequestHandler serviceActivator() throws Exception {\n return stepExecutionRequestHandler();\n}\n\n@Bean\npublic DirectChannel inboundRequests() {\n return new DirectChannel();\n}\n\npublic IntegrationFlow inboundJmsRequests() {\n return IntegrationFlows\n .from(Jms.messageDrivenChannelAdapter(connectionFactory())\n .configureListenerContainer(c -> c.subscriptionDurable(false))\n .destination("requestsQueue"))\n .channel(inboundRequests())\n .get();\n}\n\n@Bean\npublic DirectChannel outboundStaging() {\n return new DirectChannel();\n}\n\n@Bean\npublic IntegrationFlow outboundJmsStaging() {\n return IntegrationFlows.from("outboundStaging")\n .handle(Jms.outboundGateway(connectionFactory())\n .requestDestination("stagingQueue"))\n .get();\n}\n')])])]),t("p",[e._v("还必须确保分区"),t("code",[e._v("handler")]),e._v("属性映射到"),t("code",[e._v("partitionHandler")]),e._v(" Bean。")]),e._v(" "),t("p",[e._v("下面的示例将分区"),t("code",[e._v("handler")]),e._v("属性映射到 XML 中的"),t("code",[e._v("partitionHandler")]),e._v(":")]),e._v(" "),t("p",[e._v("XML 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n \n ...\n \n\n')])])]),t("p",[e._v("下面的示例将分区"),t("code",[e._v("handler")]),e._v("属性映射到 Java 中的"),t("code",[e._v("partitionHandler")]),e._v(":")]),e._v(" "),t("p",[e._v("Java 配置")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\tpublic Job personJob() {\n\t\treturn jobBuilderFactory.get("personJob")\n\t\t\t\t.start(stepBuilderFactory.get("step1.manager")\n\t\t\t\t\t\t.partitioner("step1.worker", partitioner())\n\t\t\t\t\t\t.partitionHandler(partitionHandler())\n\t\t\t\t\t\t.build())\n\t\t\t\t.build();\n\t}\n')])])]),t("p",[e._v("你可以找到远程分区作业"),t("a",{attrs:{href:"https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples#remote-partitioning-sample",target:"_blank",rel:"noopener noreferrer"}},[e._v("here"),t("OutboundLink")],1),e._v("的完整示例。")]),e._v(" "),t("p",[e._v("可以用来简化远程分区设置的"),t("code",[e._v("@EnableBatchIntegration")]),e._v("注释。这个注释为远程分区提供了两个有用的 bean:")]),e._v(" "),t("ul",[t("li",[t("p",[t("code",[e._v("RemotePartitioningManagerStepBuilderFactory")]),e._v(":用于配置 Manager 步骤")])]),e._v(" "),t("li",[t("p",[t("code",[e._v("RemotePartitioningWorkerStepBuilderFactory")]),e._v(":用于配置工作步骤")])])]),e._v(" "),t("p",[e._v("这些 API 负责配置一些组件,如下图所示:")]),e._v(" "),t("p",[t("img",{attrs:{src:"https://docs.spring.io/spring-batch/docs/current/reference/html/images/remote-partitioning-polling-config.png",alt:"远程分区配置(使用作业存储库轮询)"}})]),e._v(" "),t("p",[e._v("图 6。远程分区配置(使用作业存储库轮询)")]),e._v(" "),t("p",[t("img",{attrs:{src:"https://docs.spring.io/spring-batch/docs/current/reference/html/images/remote-partitioning-aggregation-config.png",alt:"远程分区配置(带有回复聚合)"}})]),e._v(" "),t("p",[e._v("图 7。远程分区配置(带有回复聚合)")]),e._v(" "),t("p",[e._v("在 Manager 方面,"),t("code",[e._v("RemotePartitioningManagerStepBuilderFactory")]),e._v("允许你通过声明以下内容来配置 Manager:")]),e._v(" "),t("ul",[t("li",[t("p",[e._v("用于划分数据的"),t("code",[e._v("Partitioner")])])]),e._v(" "),t("li",[t("p",[e._v("将请求发送给工作人员的输出通道(“传出请求”)")])]),e._v(" "),t("li",[t("p",[e._v("输入通道(“传入回复”)以接收来自工作人员的回复(在配置回复聚合时)")])]),e._v(" "),t("li",[t("p",[e._v("轮询间隔和超时参数(在配置作业存储库轮询时)")])])]),e._v(" "),t("p",[e._v("不需要显式配置"),t("code",[e._v("MessageChannelPartitionHandler")]),e._v("和"),t("code",[e._v("MessagingTemplate")]),e._v("(如果需要,仍然可以显式配置这些参数)。")]),e._v(" "),t("p",[e._v("在 worker 方面,"),t("code",[e._v("RemotePartitioningWorkerStepBuilderFactory")]),e._v("允许你将 worker 配置为:")]),e._v(" "),t("ul",[t("li",[t("p",[e._v("监听 Manager 在输入通道上发送的请求(“传入请求”)")])]),e._v(" "),t("li",[t("p",[e._v("对于每个请求调用"),t("code",[e._v("StepExecutionRequestHandler")]),e._v("的"),t("code",[e._v("handle")]),e._v("方法")])]),e._v(" "),t("li",[t("p",[e._v("将输出通道上的回复(“输出回复”)发送给 Manager")])])]),e._v(" "),t("p",[e._v("不需要显式配置"),t("code",[e._v("StepExecutionRequestHandler")]),e._v("(如果需要,可以显式配置)。")]),e._v(" "),t("p",[e._v("下面的示例展示了如何使用这些 API:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('@Configuration\n@EnableBatchProcessing\n@EnableBatchIntegration\npublic class RemotePartitioningJobConfiguration {\n\n @Configuration\n public static class ManagerConfiguration {\n\n @Autowired\n private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;\n\n @Bean\n public Step managerStep() {\n return this.managerStepBuilderFactory\n .get("managerStep")\n .partitioner("workerStep", partitioner())\n .gridSize(10)\n .outputChannel(outgoingRequestsToWorkers())\n .inputChannel(incomingRepliesFromWorkers())\n .build();\n }\n\n // Middleware beans setup omitted\n\n }\n\n @Configuration\n public static class WorkerConfiguration {\n\n @Autowired\n private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;\n\n @Bean\n public Step workerStep() {\n return this.workerStepBuilderFactory\n .get("workerStep")\n .inputChannel(incomingRequestsFromManager())\n .outputChannel(outgoingRepliesToManager())\n .chunk(100)\n .reader(itemReader())\n .processor(itemProcessor())\n .writer(itemWriter())\n .build();\n }\n\n // Middleware beans setup omitted\n\n }\n\n}\n')])])])])}),[],!1,null,null,null);n.default=r.exports}}]);