(window.webpackJsonp=window.webpackJsonp||[]).push([[36],{464:function(e,n,t){"use strict";t.r(n);var a=t(56),o=Object(a.a)({},(function(){var e=this,n=e.$createElement,t=e._self._c||n;return t("ContentSlotsDistributor",{attrs:{"slot-key":e.$parent.slotKey}},[t("h1",{attrs:{id:"spring-batch-integration"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#spring-batch-integration"}},[e._v("#")]),e._v(" Spring Batch Integration")]),e._v(" "),t("h2",{attrs:{id:"spring-batch-integration-2"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#spring-batch-integration-2"}},[e._v("#")]),e._v(" Spring Batch Integration")]),e._v(" "),t("p",[e._v("XMLJavaBoth")]),e._v(" "),t("h3",{attrs:{id:"spring-batch-integration-introduction"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#spring-batch-integration-introduction"}},[e._v("#")]),e._v(" Spring Batch Integration Introduction")]),e._v(" "),t("p",[e._v("Many users of Spring Batch may encounter requirements that are\noutside the scope of Spring Batch but that may be efficiently and\nconcisely implemented by using Spring Integration. Conversely, Spring\nIntegration users may encounter Spring Batch requirements and need a way\nto efficiently integrate both frameworks. In this context, several\npatterns and use-cases emerge, and Spring Batch Integration\naddresses those requirements.")]),e._v(" "),t("p",[e._v("The line between Spring Batch and Spring Integration is not always\nclear, but two pieces of advice can\nhelp: Think about granularity, and apply common patterns. Some\nof those common patterns are described in this reference manual\nsection.")]),e._v(" "),t("p",[e._v("Adding messaging to a batch process enables automation of\noperations and also separation and strategizing of key concerns.\nFor example, a message might trigger a job to execute, and then the\nsending of the message can be exposed in a variety of ways. Alternatively, when\na job completes or fails, that event might trigger a message to be sent,\nand the consumers of those messages might have operational concerns\nthat have nothing to do with the application itself. Messaging can\nalso be embedded in a job (for example reading or writing items for\nprocessing via channels). Remote partitioning and remote chunking\nprovide methods to distribute workloads over a number of workers.")]),e._v(" "),t("p",[e._v("This section covers the following key concepts:")]),e._v(" "),t("ul",[t("li",[t("p",[t("a",{attrs:{href:"#namespace-support"}},[e._v("Namespace Support")])])]),e._v(" "),t("li",[t("p",[t("a",{attrs:{href:"#launching-batch-jobs-through-messages"}},[e._v("Launching Batch Jobs through Messages")])])]),e._v(" "),t("li",[t("p",[t("a",{attrs:{href:"#providing-feedback-with-informational-messages"}},[e._v("Providing Feedback with Informational Messages")])])]),e._v(" "),t("li",[t("p",[t("a",{attrs:{href:"#asynchronous-processors"}},[e._v("Asynchronous Processors")])])]),e._v(" "),t("li",[t("p",[t("a",{attrs:{href:"#externalizing-batch-process-execution"}},[e._v("Externalizing\nBatch Process Execution")])])])]),e._v(" "),t("h4",{attrs:{id:"namespace-support"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#namespace-support"}},[e._v("#")]),e._v(" Namespace Support")]),e._v(" "),t("p",[e._v("Since Spring Batch Integration 1.3, dedicated XML Namespace\nsupport was added, with the aim to provide an easier configuration\nexperience. In order to activate the namespace, add the following\nnamespace declarations to your Spring XML Application Context\nfile:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n\n ...\n\n\n')])])]),t("p",[e._v("A fully configured Spring XML Application Context file for Spring\nBatch Integration may look like the following:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n\n ...\n\n\n')])])]),t("p",[e._v("Appending version numbers to the referenced XSD file is also\nallowed, but, as a version-less declaration always uses the\nlatest schema, we generally do not recommend appending the version\nnumber to the XSD name. Adding a version number\ncould possibly create issues when updating the Spring Batch\nIntegration dependencies, as they may require more recent versions\nof the XML schema.")]),e._v(" "),t("h4",{attrs:{id:"launching-batch-jobs-through-messages"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#launching-batch-jobs-through-messages"}},[e._v("#")]),e._v(" Launching Batch Jobs through Messages")]),e._v(" "),t("p",[e._v("When starting batch jobs by using the core Spring Batch API, you\nbasically have 2 options:")]),e._v(" "),t("ul",[t("li",[t("p",[e._v("From the command line, with the "),t("code",[e._v("CommandLineJobRunner")])])]),e._v(" "),t("li",[t("p",[e._v("Programmatically, with either "),t("code",[e._v("JobOperator.start()")]),e._v(" or "),t("code",[e._v("JobLauncher.run()")])])])]),e._v(" "),t("p",[e._v("For example, you may want to use the"),t("code",[e._v("CommandLineJobRunner")]),e._v(" when invoking Batch Jobs by\nusing a shell script. Alternatively, you may use the"),t("code",[e._v("JobOperator")]),e._v(" directly (for example, when using\nSpring Batch as part of a web application). However, what about\nmore complex use cases? Maybe you need to poll a remote (S)FTP\nserver to retrieve the data for the Batch Job or your application\nhas to support multiple different data sources simultaneously. For\nexample, you may receive data files not only from the web, but also from\nFTP and other sources. Maybe additional transformation of the input files is\nneeded before invoking Spring Batch.")]),e._v(" "),t("p",[e._v("Therefore, it would be much more powerful to execute the batch job\nusing Spring Integration and its numerous adapters. For example,\nyou can use a "),t("em",[e._v("File Inbound Channel Adapter")]),e._v(" to\nmonitor a directory in the file-system and start the Batch Job as\nsoon as the input file arrives. Additionally, you can create Spring\nIntegration flows that use multiple different adapters to easily\ningest data for your batch jobs from multiple sources\nsimultaneously using only configuration. Implementing all these\nscenarios with Spring Integration is easy, as it allows for\ndecoupled, event-driven execution of the"),t("code",[e._v("JobLauncher")]),e._v(".")]),e._v(" "),t("p",[e._v("Spring Batch Integration provides the"),t("code",[e._v("JobLaunchingMessageHandler")]),e._v(" class that you can\nuse to launch batch jobs. The input for the"),t("code",[e._v("JobLaunchingMessageHandler")]),e._v(" is provided by a\nSpring Integration message, which has a payload of type"),t("code",[e._v("JobLaunchRequest")]),e._v(". This class is a wrapper around the "),t("code",[e._v("Job")]),e._v("that needs to be launched and around the "),t("code",[e._v("JobParameters")]),e._v("necessary to launch the Batch job.")]),e._v(" "),t("p",[e._v("The following image illustrates the typical Spring Integration\nmessage flow in order to start a Batch job. The"),t("a",{attrs:{href:"https://www.enterpriseintegrationpatterns.com/toc.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("EIP (Enterprise Integration Patterns) website"),t("OutboundLink")],1),e._v("provides a full overview of messaging icons and their descriptions.")]),e._v(" "),t("p",[t("img",{attrs:{src:"https://docs.spring.io/spring-batch/docs/current/reference/html/images/launch-batch-job.png",alt:"Launch Batch Job"}})]),e._v(" "),t("p",[e._v("Figure 1. Launch Batch Job")]),e._v(" "),t("h5",{attrs:{id:"transforming-a-file-into-a-joblaunchrequest"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#transforming-a-file-into-a-joblaunchrequest"}},[e._v("#")]),e._v(" Transforming a file into a JobLaunchRequest")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("package io.spring.sbi;\n\nimport org.springframework.batch.core.Job;\nimport org.springframework.batch.core.JobParametersBuilder;\nimport org.springframework.batch.integration.launch.JobLaunchRequest;\nimport org.springframework.integration.annotation.Transformer;\nimport org.springframework.messaging.Message;\n\nimport java.io.File;\n\npublic class FileMessageToJobRequest {\n private Job job;\n private String fileParameterName;\n\n public void setFileParameterName(String fileParameterName) {\n this.fileParameterName = fileParameterName;\n }\n\n public void setJob(Job job) {\n this.job = job;\n }\n\n @Transformer\n public JobLaunchRequest toRequest(Message message) {\n JobParametersBuilder jobParametersBuilder =\n new JobParametersBuilder();\n\n jobParametersBuilder.addString(fileParameterName,\n message.getPayload().getAbsolutePath());\n\n return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());\n }\n}\n")])])]),t("h5",{attrs:{id:"the-jobexecution-response"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#the-jobexecution-response"}},[e._v("#")]),e._v(" The "),t("code",[e._v("JobExecution")]),e._v(" Response")]),e._v(" "),t("p",[e._v("When a batch job is being executed, a"),t("code",[e._v("JobExecution")]),e._v(" instance is returned. This\ninstance can be used to determine the status of an execution. If\na "),t("code",[e._v("JobExecution")]),e._v(" is able to be created\nsuccessfully, it is always returned, regardless of whether\nor not the actual execution is successful.")]),e._v(" "),t("p",[e._v("The exact behavior on how the "),t("code",[e._v("JobExecution")]),e._v("instance is returned depends on the provided"),t("code",[e._v("TaskExecutor")]),e._v(". If a"),t("code",[e._v("synchronous")]),e._v(" (single-threaded)"),t("code",[e._v("TaskExecutor")]),e._v(" implementation is used, the"),t("code",[e._v("JobExecution")]),e._v(" response is returned only"),t("code",[e._v("after")]),e._v(" the job completes. When using an"),t("code",[e._v("asynchronous``TaskExecutor")]),e._v(", the"),t("code",[e._v("JobExecution")]),e._v(" instance is returned\nimmediately. Users can then take the "),t("code",[e._v("id")]),e._v(" of"),t("code",[e._v("JobExecution")]),e._v(" instance\n(with "),t("code",[e._v("JobExecution.getJobId()")]),e._v(") and query the"),t("code",[e._v("JobRepository")]),e._v(" for the job’s updated status\nusing the "),t("code",[e._v("JobExplorer")]),e._v(". For more\ninformation, please refer to the Spring\nBatch reference documentation on"),t("RouterLink",{attrs:{to:"/en/spring-batch/job.html#queryingRepository"}},[e._v("Querying the Repository")]),e._v(".")],1),e._v(" "),t("h5",{attrs:{id:"spring-batch-integration-configuration"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#spring-batch-integration-configuration"}},[e._v("#")]),e._v(" Spring Batch Integration Configuration")]),e._v(" "),t("p",[e._v("Consider a case where someone needs to create a file "),t("code",[e._v("inbound-channel-adapter")]),e._v(" to listen\nfor CSV files in the provided directory, hand them off to a transformer\n("),t("code",[e._v("FileMessageToJobRequest")]),e._v("), launch the job through the "),t("em",[e._v("Job Launching Gateway")]),e._v(", and then\nlog the output of the "),t("code",[e._v("JobExecution")]),e._v(" with the "),t("code",[e._v("logging-channel-adapter")]),e._v(".")]),e._v(" "),t("p",[e._v("The following example shows how that common case can be configured in XML:")]),e._v(" "),t("p",[e._v("XML Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n\n\n\n\n \n\n\n\n \n \n \n \n\n\n\n\n\n')])])]),t("p",[e._v("The following example shows how that common case can be configured in Java:")]),e._v(" "),t("p",[e._v("Java Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('@Bean\npublic FileMessageToJobRequest fileMessageToJobRequest() {\n FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();\n fileMessageToJobRequest.setFileParameterName("input.file.name");\n fileMessageToJobRequest.setJob(personJob());\n return fileMessageToJobRequest;\n}\n\n@Bean\npublic JobLaunchingGateway jobLaunchingGateway() {\n SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();\n simpleJobLauncher.setJobRepository(jobRepository);\n simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());\n JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);\n\n return jobLaunchingGateway;\n}\n\n@Bean\npublic IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {\n return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/myfiles")).\n filter(new SimplePatternFileListFilter("*.csv")),\n c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).\n transform(fileMessageToJobRequest()).\n handle(jobLaunchingGateway).\n log(LoggingHandler.Level.WARN, "headers.id + \': \' + payload").\n get();\n}\n')])])]),t("h5",{attrs:{id:"example-itemreader-configuration"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#example-itemreader-configuration"}},[e._v("#")]),e._v(" Example ItemReader Configuration")]),e._v(" "),t("p",[e._v("Now that we are polling for files and launching jobs, we need to configure our Spring\nBatch "),t("code",[e._v("ItemReader")]),e._v(' (for example) to use the files found at the location defined by the job\nparameter called "input.file.name", as shown in the following bean configuration:')]),e._v(" "),t("p",[e._v("The following XML example shows the necessary bean configuration:")]),e._v(" "),t("p",[e._v("XML Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n ...\n\n')])])]),t("p",[e._v("The following Java example shows the necessary bean configuration:")]),e._v(" "),t("p",[e._v("Java Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('@Bean\n@StepScope\npublic ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {\n...\n FlatFileItemReader flatFileItemReader = new FlatFileItemReader();\n flatFileItemReader.setResource(new FileSystemResource(resource));\n...\n return flatFileItemReader;\n}\n')])])]),t("p",[e._v("The main points of interest in the preceding example are injecting the value of"),t("code",[e._v("#{jobParameters['input.file.name']}")]),e._v("as the Resource property value and setting the "),t("code",[e._v("ItemReader")]),e._v(" bean\nto have "),t("em",[e._v("Step scope")]),e._v(". Setting the bean to have Step scope takes advantage of\nthe late binding support, which allows access to the"),t("code",[e._v("jobParameters")]),e._v(" variable.")]),e._v(" "),t("h3",{attrs:{id:"available-attributes-of-the-job-launching-gateway"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#available-attributes-of-the-job-launching-gateway"}},[e._v("#")]),e._v(" Available Attributes of the Job-Launching Gateway")]),e._v(" "),t("p",[e._v("The job-launching gateway has the following attributes that you can set to control a job:")]),e._v(" "),t("ul",[t("li",[t("p",[t("code",[e._v("id")]),e._v(": Identifies the underlying Spring bean definition, which is an instance of either:")]),e._v(" "),t("ul",[t("li",[t("p",[t("code",[e._v("EventDrivenConsumer")])])]),e._v(" "),t("li",[t("p",[t("code",[e._v("PollingConsumer")]),e._v("(The exact implementation depends on whether the component’s input channel is a"),t("code",[e._v("SubscribableChannel")]),e._v(" or "),t("code",[e._v("PollableChannel")]),e._v(".)")])])])]),e._v(" "),t("li",[t("p",[t("code",[e._v("auto-startup")]),e._v(": Boolean flag to indicate that the endpoint should start automatically on\nstartup. The default is "),t("em",[e._v("true")]),e._v(".")])]),e._v(" "),t("li",[t("p",[t("code",[e._v("request-channel")]),e._v(": The input "),t("code",[e._v("MessageChannel")]),e._v(" of this endpoint.")])]),e._v(" "),t("li",[t("p",[t("code",[e._v("reply-channel")]),e._v(": "),t("code",[e._v("MessageChannel")]),e._v(" to which the resulting "),t("code",[e._v("JobExecution")]),e._v(" payload is sent.")])]),e._v(" "),t("li",[t("p",[t("code",[e._v("reply-timeout")]),e._v(": Lets you specify how long (in milliseconds) this gateway waits for the reply message\nto be sent successfully to the reply channel before throwing\nan exception. This attribute only applies when the channel\nmight block (for example, when using a bounded queue channel\nthat is currently full). Also, keep in mind that, when sending to a"),t("code",[e._v("DirectChannel")]),e._v(", the invocation occurs\nin the sender’s thread. Therefore, the failing of the send\noperation may be caused by other components further downstream.\nThe "),t("code",[e._v("reply-timeout")]),e._v(" attribute maps to the"),t("code",[e._v("sendTimeout")]),e._v(" property of the underlying"),t("code",[e._v("MessagingTemplate")]),e._v(" instance. If not specified, the attribute\ndefaults to-1,\nmeaning that, by default, the "),t("code",[e._v("Gateway")]),e._v(" waits indefinitely.")])]),e._v(" "),t("li",[t("p",[t("code",[e._v("job-launcher")]),e._v(": Optional. Accepts a\ncustom"),t("code",[e._v("JobLauncher")]),e._v("bean reference.\nIf not specified the adapter\nre-uses the instance that is registered under the "),t("code",[e._v("id")]),e._v(" of"),t("code",[e._v("jobLauncher")]),e._v(". If no default instance\nexists, an exception is thrown.")])]),e._v(" "),t("li",[t("p",[t("code",[e._v("order")]),e._v(": Specifies the order of invocation when this endpoint is connected as a subscriber\nto a "),t("code",[e._v("SubscribableChannel")]),e._v(".")])])]),e._v(" "),t("h3",{attrs:{id:"sub-elements"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#sub-elements"}},[e._v("#")]),e._v(" Sub-Elements")]),e._v(" "),t("p",[e._v("When this "),t("code",[e._v("Gateway")]),e._v(" is receiving messages from a"),t("code",[e._v("PollableChannel")]),e._v(", you must either provide\na global default "),t("code",[e._v("Poller")]),e._v(" or provide a "),t("code",[e._v("Poller")]),e._v(" sub-element to the"),t("code",[e._v("Job Launching Gateway")]),e._v(".")]),e._v(" "),t("p",[e._v("The following example shows how to provide a poller in XML:")]),e._v(" "),t("p",[e._v("XML Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n\n')])])]),t("p",[e._v("The following example shows how to provide a poller in Java:")]),e._v(" "),t("p",[e._v("Java Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('@Bean\n@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))\npublic JobLaunchingGateway sampleJobLaunchingGateway() {\n JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());\n jobLaunchingGateway.setOutputChannel(replyChannel());\n return jobLaunchingGateway;\n}\n')])])]),t("h4",{attrs:{id:"providing-feedback-with-informational-messages"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#providing-feedback-with-informational-messages"}},[e._v("#")]),e._v(" Providing Feedback with Informational Messages")]),e._v(" "),t("p",[e._v("As Spring Batch jobs can run for long times, providing progress\ninformation is often critical. For example, stake-holders may want\nto be notified if some or all parts of a batch job have failed.\nSpring Batch provides support for this information being gathered\nthrough:")]),e._v(" "),t("ul",[t("li",[t("p",[e._v("Active polling")])]),e._v(" "),t("li",[t("p",[e._v("Event-driven listeners")])])]),e._v(" "),t("p",[e._v("When starting a Spring Batch job asynchronously (for example, by using the "),t("code",[e._v("Job Launching Gateway")]),e._v("), a "),t("code",[e._v("JobExecution")]),e._v(" instance is returned. Thus, "),t("code",[e._v("JobExecution.getJobId()")]),e._v(" can be\nused to continuously poll for status updates by retrieving updated instances of the"),t("code",[e._v("JobExecution")]),e._v(" from the "),t("code",[e._v("JobRepository")]),e._v(" by using the "),t("code",[e._v("JobExplorer")]),e._v(". However, this is\nconsidered sub-optimal, and an event-driven approach should be preferred.")]),e._v(" "),t("p",[e._v("Therefore, Spring Batch provides listeners, including the three most commonly used\nlisteners:")]),e._v(" "),t("ul",[t("li",[t("p",[t("code",[e._v("StepListener")])])]),e._v(" "),t("li",[t("p",[t("code",[e._v("ChunkListener")])])]),e._v(" "),t("li",[t("p",[t("code",[e._v("JobExecutionListener")])])])]),e._v(" "),t("p",[e._v("In the example shown in the following image, a Spring Batch job has been configured with a"),t("code",[e._v("StepExecutionListener")]),e._v(". Thus, Spring Integration receives and processes any step before\nor after events. For example, the received "),t("code",[e._v("StepExecution")]),e._v(" can be inspected by using a"),t("code",[e._v("Router")]),e._v(". Based on the results of that inspection, various things can occur (such as\nrouting a message to a Mail Outbound Channel Adapter), so that an Email notification can\nbe sent out based on some condition.")]),e._v(" "),t("p",[t("img",{attrs:{src:"https://docs.spring.io/spring-batch/docs/current/reference/html/images/handling-informational-messages.png",alt:"Handling Informational Messages"}})]),e._v(" "),t("p",[e._v("Figure 2. Handling Informational Messages")]),e._v(" "),t("p",[e._v("The following two-part example shows how a listener is configured to send a\nmessage to a "),t("code",[e._v("Gateway")]),e._v(" for a "),t("code",[e._v("StepExecution")]),e._v(" events and log its output to a"),t("code",[e._v("logging-channel-adapter")]),e._v(".")]),e._v(" "),t("p",[e._v("First, create the notification integration beans.")]),e._v(" "),t("p",[e._v("The following example shows the how to create the notification integration beans in XML:")]),e._v(" "),t("p",[e._v("XML Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n\n\n\n\n')])])]),t("p",[e._v("The following example shows the how to create the notification integration beans in Java:")]),e._v(" "),t("p",[e._v("Java Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('@Bean\n@ServiceActivator(inputChannel = "stepExecutionsChannel")\npublic LoggingHandler loggingHandler() {\n LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);\n adapter.setLoggerName("TEST_LOGGER");\n adapter.setLogExpressionString("headers.id + \': \' + payload");\n return adapter;\n}\n\n@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")\npublic interface NotificationExecutionListener extends StepExecutionListener {}\n')])])]),t("table",[t("thead",[t("tr",[t("th"),e._v(" "),t("th",[e._v("You need to add the "),t("code",[e._v("@IntegrationComponentScan")]),e._v(" annotation to your configuration.")])])]),e._v(" "),t("tbody")]),e._v(" "),t("p",[e._v("Second, modify your job to add a step-level listener.")]),e._v(" "),t("p",[e._v("The following example shows the how to add a step-level listener in XML:")]),e._v(" "),t("p",[e._v("XML Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n \n \n \n \n \n \n ...\n \n\n')])])]),t("p",[e._v("The following example shows the how to add a step-level listener in Java:")]),e._v(" "),t("p",[e._v("Java Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('public Job importPaymentsJob() {\n return jobBuilderFactory.get("importPayments")\n .start(stepBuilderFactory.get("step1")\n .chunk(200)\n .listener(notificationExecutionsListener())\n ...\n}\n')])])]),t("h4",{attrs:{id:"asynchronous-processors"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#asynchronous-processors"}},[e._v("#")]),e._v(" Asynchronous Processors")]),e._v(" "),t("p",[e._v("Asynchronous Processors help you to scale the processing of items. In the asynchronous\nprocessor use case, an "),t("code",[e._v("AsyncItemProcessor")]),e._v(" serves as a dispatcher, executing the logic of\nthe "),t("code",[e._v("ItemProcessor")]),e._v(" for an item on a new thread. Once the item completes, the "),t("code",[e._v("Future")]),e._v(" is\npassed to the "),t("code",[e._v("AsynchItemWriter")]),e._v(" to be written.")]),e._v(" "),t("p",[e._v("Therefore, you can increase performance by using asynchronous item processing, basically\nletting you implement "),t("em",[e._v("fork-join")]),e._v(" scenarios. The "),t("code",[e._v("AsyncItemWriter")]),e._v(" gathers the results and\nwrites back the chunk as soon as all the results become available.")]),e._v(" "),t("p",[e._v("The following example shows how to configuration the "),t("code",[e._v("AsyncItemProcessor")]),e._v(" in XML:")]),e._v(" "),t("p",[e._v("XML Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n \n \n \n \n \n\n')])])]),t("p",[e._v("The following example shows how to configuration the "),t("code",[e._v("AsyncItemProcessor")]),e._v(" in XML:")]),e._v(" "),t("p",[e._v("Java Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("@Bean\npublic AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {\n AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();\n asyncItemProcessor.setTaskExecutor(taskExecutor);\n asyncItemProcessor.setDelegate(itemProcessor);\n return asyncItemProcessor;\n}\n")])])]),t("p",[e._v("The "),t("code",[e._v("delegate")]),e._v(" property refers to your "),t("code",[e._v("ItemProcessor")]),e._v(" bean, and the "),t("code",[e._v("taskExecutor")]),e._v("property refers to the "),t("code",[e._v("TaskExecutor")]),e._v(" of your choice.")]),e._v(" "),t("p",[e._v("The following example shows how to configure the "),t("code",[e._v("AsyncItemWriter")]),e._v(" in XML:")]),e._v(" "),t("p",[e._v("XML Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n \n \n\n')])])]),t("p",[e._v("The following example shows how to configure the "),t("code",[e._v("AsyncItemWriter")]),e._v(" in Java:")]),e._v(" "),t("p",[e._v("Java Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v("@Bean\npublic AsyncItemWriter writer(ItemWriter itemWriter) {\n AsyncItemWriter asyncItemWriter = new AsyncItemWriter();\n asyncItemWriter.setDelegate(itemWriter);\n return asyncItemWriter;\n}\n")])])]),t("p",[e._v("Again, the "),t("code",[e._v("delegate")]),e._v(" property is\nactually a reference to your "),t("code",[e._v("ItemWriter")]),e._v(" bean.")]),e._v(" "),t("h4",{attrs:{id:"externalizing-batch-process-execution"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#externalizing-batch-process-execution"}},[e._v("#")]),e._v(" Externalizing Batch Process Execution")]),e._v(" "),t("p",[e._v("The integration approaches discussed so far suggest use cases\nwhere Spring Integration wraps Spring Batch like an outer-shell.\nHowever, Spring Batch can also use Spring Integration internally.\nUsing this approach, Spring Batch users can delegate the\nprocessing of items or even chunks to outside processes. This\nallows you to offload complex processing. Spring Batch Integration\nprovides dedicated support for:")]),e._v(" "),t("ul",[t("li",[t("p",[e._v("Remote Chunking")])]),e._v(" "),t("li",[t("p",[e._v("Remote Partitioning")])])]),e._v(" "),t("h5",{attrs:{id:"remote-chunking"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#remote-chunking"}},[e._v("#")]),e._v(" Remote Chunking")]),e._v(" "),t("p",[t("img",{attrs:{src:"https://docs.spring.io/spring-batch/docs/current/reference/html/images/remote-chunking-sbi.png",alt:"Remote Chunking"}})]),e._v(" "),t("p",[e._v("Figure 3. Remote Chunking")]),e._v(" "),t("p",[e._v("Taking things one step further, one can also externalize the\nchunk processing by using the"),t("code",[e._v("ChunkMessageChannelItemWriter")]),e._v("(provided by Spring Batch Integration), which sends items out\nand collects the result. Once sent, Spring Batch continues the\nprocess of reading and grouping items, without waiting for the results.\nRather, it is the responsibility of the "),t("code",[e._v("ChunkMessageChannelItemWriter")]),e._v("to gather the results and integrate them back into the Spring Batch process.")]),e._v(" "),t("p",[e._v("With Spring Integration, you have full\ncontrol over the concurrency of your processes (for instance, by\nusing a "),t("code",[e._v("QueueChannel")]),e._v(" instead of a"),t("code",[e._v("DirectChannel")]),e._v("). Furthermore, by relying on\nSpring Integration’s rich collection of Channel Adapters (such as\nJMS and AMQP), you can distribute chunks of a Batch job to\nexternal systems for processing.")]),e._v(" "),t("p",[e._v("A job with a step to be remotely chunked might have a configuration similar to the\nfollowing in XML:")]),e._v(" "),t("p",[e._v("XML Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n \n \n \n ...\n \n\n')])])]),t("p",[e._v("A job with a step to be remotely chunked might have a configuration similar to the\nfollowing in Java:")]),e._v(" "),t("p",[e._v("Java Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('public Job chunkJob() {\n return jobBuilderFactory.get("personJob")\n .start(stepBuilderFactory.get("step1")\n .chunk(200)\n .reader(itemReader())\n .writer(itemWriter())\n .build())\n .build();\n }\n')])])]),t("p",[e._v("The "),t("code",[e._v("ItemReader")]),e._v(" reference points to the bean you want to use for reading data on the\nmanager. The "),t("code",[e._v("ItemWriter")]),e._v(" reference points to a special "),t("code",[e._v("ItemWriter")]),e._v(" (called"),t("code",[e._v("ChunkMessageChannelItemWriter")]),e._v("), as described above. The processor (if any) is left off\nthe manager configuration, as it is configured on the worker. You should check any\nadditional component properties, such as throttle limits and so on, when implementing\nyour use case.")]),e._v(" "),t("p",[e._v("The following XML configuration provides a basic manager setup:")]),e._v(" "),t("p",[e._v("XML Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n\n\n\n\n\n \n \n\n\n\n \n \n\n\n\n \n\n\n\n')])])]),t("p",[e._v("The following Java configuration provides a basic manager setup:")]),e._v(" "),t("p",[e._v("Java Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('@Bean\npublic org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {\n ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();\n factory.setBrokerURL("tcp://localhost:61616");\n return factory;\n}\n\n/*\n * Configure outbound flow (requests going to workers)\n */\n@Bean\npublic DirectChannel requests() {\n return new DirectChannel();\n}\n\n@Bean\npublic IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {\n return IntegrationFlows\n .from(requests())\n .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))\n .get();\n}\n\n/*\n * Configure inbound flow (replies coming from workers)\n */\n@Bean\npublic QueueChannel replies() {\n return new QueueChannel();\n}\n\n@Bean\npublic IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {\n return IntegrationFlows\n .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))\n .channel(replies())\n .get();\n}\n\n/*\n * Configure the ChunkMessageChannelItemWriter\n */\n@Bean\npublic ItemWriter itemWriter() {\n MessagingTemplate messagingTemplate = new MessagingTemplate();\n messagingTemplate.setDefaultChannel(requests());\n messagingTemplate.setReceiveTimeout(2000);\n ChunkMessageChannelItemWriter chunkMessageChannelItemWriter\n = new ChunkMessageChannelItemWriter<>();\n chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);\n chunkMessageChannelItemWriter.setReplyChannel(replies());\n return chunkMessageChannelItemWriter;\n}\n')])])]),t("p",[e._v("The preceding configuration provides us with a number of beans. We\nconfigure our messaging middleware using ActiveMQ and the\ninbound/outbound JMS adapters provided by Spring Integration. As\nshown, our "),t("code",[e._v("itemWriter")]),e._v(" bean, which is\nreferenced by our job step, uses the"),t("code",[e._v("ChunkMessageChannelItemWriter")]),e._v(" for writing chunks over the\nconfigured middleware.")]),e._v(" "),t("p",[e._v("Now we can move on to the worker configuration, as shown in the following example:")]),e._v(" "),t("p",[e._v("The following example shows the worker configuration in XML:")]),e._v(" "),t("p",[e._v("XML Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n\n\n\n\n\n\n\n\n\n\n\n\n\n \n \n \n \n \n \n \n \n \n \n\n')])])]),t("p",[e._v("The following example shows the worker configuration in Java:")]),e._v(" "),t("p",[e._v("Java Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('@Bean\npublic org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {\n ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();\n factory.setBrokerURL("tcp://localhost:61616");\n return factory;\n}\n\n/*\n * Configure inbound flow (requests coming from the manager)\n */\n@Bean\npublic DirectChannel requests() {\n return new DirectChannel();\n}\n\n@Bean\npublic IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {\n return IntegrationFlows\n .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))\n .channel(requests())\n .get();\n}\n\n/*\n * Configure outbound flow (replies going to the manager)\n */\n@Bean\npublic DirectChannel replies() {\n return new DirectChannel();\n}\n\n@Bean\npublic IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {\n return IntegrationFlows\n .from(replies())\n .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))\n .get();\n}\n\n/*\n * Configure the ChunkProcessorChunkHandler\n */\n@Bean\n@ServiceActivator(inputChannel = "requests", outputChannel = "replies")\npublic ChunkProcessorChunkHandler chunkProcessorChunkHandler() {\n ChunkProcessor chunkProcessor\n = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());\n ChunkProcessorChunkHandler chunkProcessorChunkHandler\n = new ChunkProcessorChunkHandler<>();\n chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);\n return chunkProcessorChunkHandler;\n}\n')])])]),t("p",[e._v("Most of these configuration items should look familiar from the\nmanager configuration. Workers do not need access to\nthe Spring Batch "),t("code",[e._v("JobRepository")]),e._v(" nor\nto the actual job configuration file. The main bean of interest\nis the "),t("code",[e._v("chunkProcessorChunkHandler")]),e._v(". The"),t("code",[e._v("chunkProcessor")]),e._v(" property of "),t("code",[e._v("ChunkProcessorChunkHandler")]),e._v(" takes a\nconfigured "),t("code",[e._v("SimpleChunkProcessor")]),e._v(", which is where you would provide a reference to your"),t("code",[e._v("ItemWriter")]),e._v(" (and, optionally, your"),t("code",[e._v("ItemProcessor")]),e._v(") that will run on the worker\nwhen it receives chunks from the manager.")]),e._v(" "),t("p",[e._v('For more information, see the section of the "Scalability" chapter on'),t("a",{attrs:{href:"https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#remoteChunking",target:"_blank",rel:"noopener noreferrer"}},[e._v("Remote Chunking"),t("OutboundLink")],1),e._v(".")]),e._v(" "),t("p",[e._v("Starting from version 4.1, Spring Batch Integration introduces the "),t("code",[e._v("@EnableBatchIntegration")]),e._v("annotation that can be used to simplify a remote chunking setup. This annotation provides\ntwo beans that can be autowired in the application context:")]),e._v(" "),t("ul",[t("li",[t("p",[t("code",[e._v("RemoteChunkingManagerStepBuilderFactory")]),e._v(": used to configure the manager step")])]),e._v(" "),t("li",[t("p",[t("code",[e._v("RemoteChunkingWorkerBuilder")]),e._v(": used to configure the remote worker integration flow")])])]),e._v(" "),t("p",[e._v("These APIs take care of configuring a number of components as described in the following diagram:")]),e._v(" "),t("p",[t("img",{attrs:{src:"https://docs.spring.io/spring-batch/docs/current/reference/html/images/remote-chunking-config.png",alt:"Remote Chunking Configuration"}})]),e._v(" "),t("p",[e._v("Figure 4. Remote Chunking Configuration")]),e._v(" "),t("p",[e._v("On the manager side, the "),t("code",[e._v("RemoteChunkingManagerStepBuilderFactory")]),e._v(" lets you\nconfigure a manager step by declaring:")]),e._v(" "),t("ul",[t("li",[t("p",[e._v("the item reader to read items and send them to workers")])]),e._v(" "),t("li",[t("p",[e._v('the output channel ("Outgoing requests") to send requests to workers')])]),e._v(" "),t("li",[t("p",[e._v('the input channel ("Incoming replies") to receive replies from workers')])])]),e._v(" "),t("p",[e._v("A "),t("code",[e._v("ChunkMessageChannelItemWriter")]),e._v(" and the "),t("code",[e._v("MessagingTemplate")]),e._v(" are not needed to be explicitly configured\n(Those can still be explicitly configured if required).")]),e._v(" "),t("p",[e._v("On the worker side, the "),t("code",[e._v("RemoteChunkingWorkerBuilder")]),e._v(" allows you to configure a worker to:")]),e._v(" "),t("ul",[t("li",[t("p",[e._v('listen to requests sent by the manager on the input channel ("Incoming requests")')])]),e._v(" "),t("li",[t("p",[e._v("call the "),t("code",[e._v("handleChunk")]),e._v(" method of "),t("code",[e._v("ChunkProcessorChunkHandler")]),e._v(" for each request\nwith the configured "),t("code",[e._v("ItemProcessor")]),e._v(" and "),t("code",[e._v("ItemWriter")])])]),e._v(" "),t("li",[t("p",[e._v('send replies on the output channel ("Outgoing replies") to the manager')])])]),e._v(" "),t("p",[e._v("There is no need to explicitly configure the "),t("code",[e._v("SimpleChunkProcessor")]),e._v("and the "),t("code",[e._v("ChunkProcessorChunkHandler")]),e._v(" (Those can be explicitly configured if required).")]),e._v(" "),t("p",[e._v("The following example shows how to use these APIs:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('@EnableBatchIntegration\n@EnableBatchProcessing\npublic class RemoteChunkingJobConfiguration {\n\n @Configuration\n public static class ManagerConfiguration {\n\n @Autowired\n private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;\n\n @Bean\n public TaskletStep managerStep() {\n return this.managerStepBuilderFactory.get("managerStep")\n .chunk(100)\n .reader(itemReader())\n .outputChannel(requests()) // requests sent to workers\n .inputChannel(replies()) // replies received from workers\n .build();\n }\n\n // Middleware beans setup omitted\n\n }\n\n @Configuration\n public static class WorkerConfiguration {\n\n @Autowired\n private RemoteChunkingWorkerBuilder workerBuilder;\n\n @Bean\n public IntegrationFlow workerFlow() {\n return this.workerBuilder\n .itemProcessor(itemProcessor())\n .itemWriter(itemWriter())\n .inputChannel(requests()) // requests received from the manager\n .outputChannel(replies()) // replies sent to the manager\n .build();\n }\n\n // Middleware beans setup omitted\n\n }\n\n}\n')])])]),t("p",[e._v("You can find a complete example of a remote chunking job"),t("a",{attrs:{href:"https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples#remote-chunking-sample",target:"_blank",rel:"noopener noreferrer"}},[e._v("here"),t("OutboundLink")],1),e._v(".")]),e._v(" "),t("h5",{attrs:{id:"remote-partitioning"}},[t("a",{staticClass:"header-anchor",attrs:{href:"#remote-partitioning"}},[e._v("#")]),e._v(" Remote Partitioning")]),e._v(" "),t("p",[t("img",{attrs:{src:"https://docs.spring.io/spring-batch/docs/current/reference/html/images/remote-partitioning.png",alt:"Remote Partitioning"}})]),e._v(" "),t("p",[e._v("Figure 5. Remote Partitioning")]),e._v(" "),t("p",[e._v("Remote Partitioning, on the other hand, is useful when it\nis not the processing of items but rather the associated I/O that\ncauses the bottleneck. Using Remote Partitioning, work can\nbe farmed out to workers that execute complete Spring Batch\nsteps. Thus, each worker has its own "),t("code",[e._v("ItemReader")]),e._v(", "),t("code",[e._v("ItemProcessor")]),e._v(", and"),t("code",[e._v("ItemWriter")]),e._v(". For this purpose, Spring Batch\nIntegration provides the "),t("code",[e._v("MessageChannelPartitionHandler")]),e._v(".")]),e._v(" "),t("p",[e._v("This implementation of the "),t("code",[e._v("PartitionHandler")]),e._v("interface uses "),t("code",[e._v("MessageChannel")]),e._v(" instances to\nsend instructions to remote workers and receive their responses.\nThis provides a nice abstraction from the transports (such as JMS\nand AMQP) being used to communicate with the remote workers.")]),e._v(" "),t("p",[e._v('The section of the "Scalability" chapter that addresses'),t("RouterLink",{attrs:{to:"/en/spring-batch/scalability.html#partitioning"}},[e._v("remote partitioning")]),e._v(" provides an overview of the concepts and\ncomponents needed to configure remote partitioning and shows an\nexample of using the default"),t("code",[e._v("TaskExecutorPartitionHandler")]),e._v(" to partition\nin separate local threads of execution. For remote partitioning\nto multiple JVMs, two additional components are required:")],1),e._v(" "),t("ul",[t("li",[t("p",[e._v("A remoting fabric or grid environment")])]),e._v(" "),t("li",[t("p",[e._v("A "),t("code",[e._v("PartitionHandler")]),e._v(" implementation that supports the desired\nremoting fabric or grid environment")])])]),e._v(" "),t("p",[e._v("Similar to remote chunking, JMS can be used as the “remoting fabric”. In that case, use\na "),t("code",[e._v("MessageChannelPartitionHandler")]),e._v(" instance as the "),t("code",[e._v("PartitionHandler")]),e._v(" implementation,\nas described earlier.")]),e._v(" "),t("p",[e._v("The following example assumes an existing partitioned job and focuses on the"),t("code",[e._v("MessageChannelPartitionHandler")]),e._v(" and JMS configuration in XML:")]),e._v(" "),t("p",[e._v("XML Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n \n \n \n \n \n \n \n \n\n\n\n\n\n\n\n\n\n \n \n\n\n\n\n\n\n\n\n\n\n\n\n\n \n\n\n\n')])])]),t("p",[e._v("The following example assumes an existing partitioned job and focuses on the"),t("code",[e._v("MessageChannelPartitionHandler")]),e._v(" and JMS configuration in Java:")]),e._v(" "),t("p",[e._v("Java Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('/*\n * Configuration of the manager side\n */\n@Bean\npublic PartitionHandler partitionHandler() {\n MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();\n partitionHandler.setStepName("step1");\n partitionHandler.setGridSize(3);\n partitionHandler.setReplyChannel(outboundReplies());\n MessagingTemplate template = new MessagingTemplate();\n template.setDefaultChannel(outboundRequests());\n template.setReceiveTimeout(100000);\n partitionHandler.setMessagingOperations(template);\n return partitionHandler;\n}\n\n@Bean\npublic QueueChannel outboundReplies() {\n return new QueueChannel();\n}\n\n@Bean\npublic DirectChannel outboundRequests() {\n return new DirectChannel();\n}\n\n@Bean\npublic IntegrationFlow outboundJmsRequests() {\n return IntegrationFlows.from("outboundRequests")\n .handle(Jms.outboundGateway(connectionFactory())\n .requestDestination("requestsQueue"))\n .get();\n}\n\n@Bean\n@ServiceActivator(inputChannel = "inboundStaging")\npublic AggregatorFactoryBean partitioningMessageHandler() throws Exception {\n AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();\n aggregatorFactoryBean.setProcessorBean(partitionHandler());\n aggregatorFactoryBean.setOutputChannel(outboundReplies());\n // configure other propeties of the aggregatorFactoryBean\n return aggregatorFactoryBean;\n}\n\n@Bean\npublic DirectChannel inboundStaging() {\n return new DirectChannel();\n}\n\n@Bean\npublic IntegrationFlow inboundJmsStaging() {\n return IntegrationFlows\n .from(Jms.messageDrivenChannelAdapter(connectionFactory())\n .configureListenerContainer(c -> c.subscriptionDurable(false))\n .destination("stagingQueue"))\n .channel(inboundStaging())\n .get();\n}\n\n/*\n * Configuration of the worker side\n */\n@Bean\npublic StepExecutionRequestHandler stepExecutionRequestHandler() {\n StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();\n stepExecutionRequestHandler.setJobExplorer(jobExplorer);\n stepExecutionRequestHandler.setStepLocator(stepLocator());\n return stepExecutionRequestHandler;\n}\n\n@Bean\n@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")\npublic StepExecutionRequestHandler serviceActivator() throws Exception {\n return stepExecutionRequestHandler();\n}\n\n@Bean\npublic DirectChannel inboundRequests() {\n return new DirectChannel();\n}\n\npublic IntegrationFlow inboundJmsRequests() {\n return IntegrationFlows\n .from(Jms.messageDrivenChannelAdapter(connectionFactory())\n .configureListenerContainer(c -> c.subscriptionDurable(false))\n .destination("requestsQueue"))\n .channel(inboundRequests())\n .get();\n}\n\n@Bean\npublic DirectChannel outboundStaging() {\n return new DirectChannel();\n}\n\n@Bean\npublic IntegrationFlow outboundJmsStaging() {\n return IntegrationFlows.from("outboundStaging")\n .handle(Jms.outboundGateway(connectionFactory())\n .requestDestination("stagingQueue"))\n .get();\n}\n')])])]),t("p",[e._v("You must also ensure that the partition "),t("code",[e._v("handler")]),e._v(" attribute maps to the "),t("code",[e._v("partitionHandler")]),e._v("bean.")]),e._v(" "),t("p",[e._v("The following example maps the partition "),t("code",[e._v("handler")]),e._v(" attribute to the "),t("code",[e._v("partitionHandler")]),e._v(" in\nXML:")]),e._v(" "),t("p",[e._v("XML Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\n \n \n ...\n \n\n')])])]),t("p",[e._v("The following example maps the partition "),t("code",[e._v("handler")]),e._v(" attribute to the "),t("code",[e._v("partitionHandler")]),e._v(" in\nJava:")]),e._v(" "),t("p",[e._v("Java Configuration")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('\tpublic Job personJob() {\n\t\treturn jobBuilderFactory.get("personJob")\n\t\t\t\t.start(stepBuilderFactory.get("step1.manager")\n\t\t\t\t\t\t.partitioner("step1.worker", partitioner())\n\t\t\t\t\t\t.partitionHandler(partitionHandler())\n\t\t\t\t\t\t.build())\n\t\t\t\t.build();\n\t}\n')])])]),t("p",[e._v("You can find a complete example of a remote partitioning job"),t("a",{attrs:{href:"https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples#remote-partitioning-sample",target:"_blank",rel:"noopener noreferrer"}},[e._v("here"),t("OutboundLink")],1),e._v(".")]),e._v(" "),t("p",[e._v("The "),t("code",[e._v("@EnableBatchIntegration")]),e._v(" annotation that can be used to simplify a remote\npartitioning setup. This annotation provides two beans useful for remote partitioning:")]),e._v(" "),t("ul",[t("li",[t("p",[t("code",[e._v("RemotePartitioningManagerStepBuilderFactory")]),e._v(": used to configure the manager step")])]),e._v(" "),t("li",[t("p",[t("code",[e._v("RemotePartitioningWorkerStepBuilderFactory")]),e._v(": used to configure the worker step")])])]),e._v(" "),t("p",[e._v("These APIs take care of configuring a number of components as described in the following diagram:")]),e._v(" "),t("p",[t("img",{attrs:{src:"https://docs.spring.io/spring-batch/docs/current/reference/html/images/remote-partitioning-polling-config.png",alt:"Remote Partitioning Configuration (with job repository polling)"}})]),e._v(" "),t("p",[e._v("Figure 6. Remote Partitioning Configuration (with job repository polling)")]),e._v(" "),t("p",[t("img",{attrs:{src:"https://docs.spring.io/spring-batch/docs/current/reference/html/images/remote-partitioning-aggregation-config.png",alt:"Remote Partitioning Configuration (with replies aggregation)"}})]),e._v(" "),t("p",[e._v("Figure 7. Remote Partitioning Configuration (with replies aggregation)")]),e._v(" "),t("p",[e._v("On the manager side, the "),t("code",[e._v("RemotePartitioningManagerStepBuilderFactory")]),e._v(" allows you to\nconfigure a manager step by declaring:")]),e._v(" "),t("ul",[t("li",[t("p",[e._v("the "),t("code",[e._v("Partitioner")]),e._v(" used to partition data")])]),e._v(" "),t("li",[t("p",[e._v('the output channel ("Outgoing requests") to send requests to workers')])]),e._v(" "),t("li",[t("p",[e._v('the input channel ("Incoming replies") to receive replies from workers (when configuring replies aggregation)')])]),e._v(" "),t("li",[t("p",[e._v("the poll interval and timeout parameters (when configuring job repository polling)")])])]),e._v(" "),t("p",[e._v("The "),t("code",[e._v("MessageChannelPartitionHandler")]),e._v(" and the "),t("code",[e._v("MessagingTemplate")]),e._v(" are not needed to be explicitly configured\n(Those can still be explicitly configured if required).")]),e._v(" "),t("p",[e._v("On the worker side, the "),t("code",[e._v("RemotePartitioningWorkerStepBuilderFactory")]),e._v(" allows you to configure a worker to:")]),e._v(" "),t("ul",[t("li",[t("p",[e._v('listen to requests sent by the manager on the input channel ("Incoming requests")')])]),e._v(" "),t("li",[t("p",[e._v("call the "),t("code",[e._v("handle")]),e._v(" method of "),t("code",[e._v("StepExecutionRequestHandler")]),e._v(" for each request")])]),e._v(" "),t("li",[t("p",[e._v('send replies on the output channel ("Outgoing replies") to the manager')])])]),e._v(" "),t("p",[e._v("There is no need to explicitly configure the "),t("code",[e._v("StepExecutionRequestHandler")]),e._v(" (which can be explicitly configured if required).")]),e._v(" "),t("p",[e._v("The following example shows how to use these APIs:")]),e._v(" "),t("div",{staticClass:"language- extra-class"},[t("pre",{pre:!0,attrs:{class:"language-text"}},[t("code",[e._v('@Configuration\n@EnableBatchProcessing\n@EnableBatchIntegration\npublic class RemotePartitioningJobConfiguration {\n\n @Configuration\n public static class ManagerConfiguration {\n\n @Autowired\n private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;\n\n @Bean\n public Step managerStep() {\n return this.managerStepBuilderFactory\n .get("managerStep")\n .partitioner("workerStep", partitioner())\n .gridSize(10)\n .outputChannel(outgoingRequestsToWorkers())\n .inputChannel(incomingRepliesFromWorkers())\n .build();\n }\n\n // Middleware beans setup omitted\n\n }\n\n @Configuration\n public static class WorkerConfiguration {\n\n @Autowired\n private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;\n\n @Bean\n public Step workerStep() {\n return this.workerStepBuilderFactory\n .get("workerStep")\n .inputChannel(incomingRequestsFromManager())\n .outputChannel(outgoingRepliesToManager())\n .chunk(100)\n .reader(itemReader())\n .processor(itemProcessor())\n .writer(itemWriter())\n .build();\n }\n\n // Middleware beans setup omitted\n\n }\n\n}\n')])])])])}),[],!1,null,null,null);n.default=o.exports}}]);