(window.webpackJsonp=window.webpackJsonp||[]).push([[127],{570:function(e,t,a){"use strict";a.r(t);var n=a(56),r=Object(n.a)({},(function(){var e=this,t=e.$createElement,a=e._self._c||t;return a("ContentSlotsDistributor",{attrs:{"slot-key":e.$parent.slotKey}},[a("h1",{attrs:{id:"message-routing"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#message-routing"}},[e._v("#")]),e._v(" Message Routing")]),e._v(" "),a("h2",{attrs:{id:"message-routing-2"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#message-routing-2"}},[e._v("#")]),e._v(" Message Routing")]),e._v(" "),a("p",[e._v("This chapter covers the details of using Spring Integration to route messages.")]),e._v(" "),a("h3",{attrs:{id:"routers"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#routers"}},[e._v("#")]),e._v(" Routers")]),e._v(" "),a("p",[e._v("This section covers how routers work.\nIt includes the following topics:")]),e._v(" "),a("ul",[a("li",[a("p",[a("a",{attrs:{href:"#router-overview"}},[e._v("Overview")])])]),e._v(" "),a("li",[a("p",[a("a",{attrs:{href:"#router-common-parameters"}},[e._v("Common Router Parameters")])])]),e._v(" "),a("li",[a("p",[a("a",{attrs:{href:"#router-implementations"}},[e._v("Router Implementations")])])]),e._v(" "),a("li",[a("p",[a("a",{attrs:{href:"#router-namespace"}},[e._v("Configuring a Generic Router")])])]),e._v(" "),a("li",[a("p",[a("a",{attrs:{href:"#router-spel"}},[e._v("Routers and the Spring Expression Language (SpEL)")])])]),e._v(" "),a("li",[a("p",[a("a",{attrs:{href:"#dynamic-routers"}},[e._v("Dynamic Routers")])])])]),e._v(" "),a("h4",{attrs:{id:"overview"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#overview"}},[e._v("#")]),e._v(" Overview")]),e._v(" "),a("p",[e._v("Routers are a crucial element in many messaging architectures.\nThey consume messages from a message channel and forward each consumed message to one or more different message channels depending on a set of conditions.")]),e._v(" "),a("p",[e._v("Spring Integration provides the following routers:")]),e._v(" "),a("ul",[a("li",[a("p",[a("a",{attrs:{href:"#router-implementations-payloadtyperouter"}},[e._v("Payload Type Router")])])]),e._v(" "),a("li",[a("p",[a("a",{attrs:{href:"#router-implementations-headervaluerouter"}},[e._v("Header Value Router")])])]),e._v(" "),a("li",[a("p",[a("a",{attrs:{href:"#router-implementations-recipientlistrouter"}},[e._v("Recipient List Router")])])]),e._v(" "),a("li",[a("p",[a("RouterLink",{attrs:{to:"/en/spring-integration/xml.html#xml-xpath-routing"}},[e._v("XPath Router (part of the XML module)")])],1)]),e._v(" "),a("li",[a("p",[a("a",{attrs:{href:"#router-implementations-exception-router"}},[e._v("Error Message Exception Type Router")])])]),e._v(" "),a("li",[a("p",[a("a",{attrs:{href:"#router-namespace"}},[e._v("(Generic) Router")])])])]),e._v(" "),a("p",[e._v("Router implementations share many configuration parameters.\nHowever, certain differences exist between routers.\nFurthermore, the availability of configuration parameters depends on whether routers are used inside or outside of a chain.\nIn order to provide a quick overview, all available attributes are listed in the two following tables .")]),e._v(" "),a("p",[e._v("The following table shows the configuration parameters available for a router outside of a chain:")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th",[e._v("Attribute")]),e._v(" "),a("th",[e._v("router")]),e._v(" "),a("th",[e._v("header value router")]),e._v(" "),a("th",[e._v("xpath router")]),e._v(" "),a("th",[e._v("payload type router")]),e._v(" "),a("th",[e._v("recipient list route")]),e._v(" "),a("th",[e._v("exception type router")])])]),e._v(" "),a("tbody",[a("tr",[a("td",[e._v("apply-sequence")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})])]),e._v(" "),a("tr",[a("td",[e._v("default-output-channel")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})])]),e._v(" "),a("tr",[a("td",[e._v("resolution-required")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})])]),e._v(" "),a("tr",[a("td",[e._v("ignore-send-failures")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})])]),e._v(" "),a("tr",[a("td",[e._v("timeout")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})])]),e._v(" "),a("tr",[a("td",[e._v("id")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})])]),e._v(" "),a("tr",[a("td",[e._v("auto-startup")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})])]),e._v(" "),a("tr",[a("td",[e._v("input-channel")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})])]),e._v(" "),a("tr",[a("td",[e._v("order")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})])]),e._v(" "),a("tr",[a("td",[e._v("method")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")]),e._v(" "),a("tr",[a("td",[e._v("ref")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")]),e._v(" "),a("tr",[a("td",[e._v("expression")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")]),e._v(" "),a("tr",[a("td",[e._v("header-name")]),e._v(" "),a("td"),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")]),e._v(" "),a("tr",[a("td",[e._v("evaluate-as-string")]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")]),e._v(" "),a("tr",[a("td",[e._v("xpath-expression-ref")]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")]),e._v(" "),a("tr",[a("td",[e._v("converter")]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")])])]),e._v(" "),a("p",[e._v("The following table shows the configuration parameters available for a router inside of a chain:")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th",[e._v("Attribute")]),e._v(" "),a("th",[e._v("router")]),e._v(" "),a("th",[e._v("header value router")]),e._v(" "),a("th",[e._v("xpath router")]),e._v(" "),a("th",[e._v("payload type router")]),e._v(" "),a("th",[e._v("recipient list router")]),e._v(" "),a("th",[e._v("exception type router")])])]),e._v(" "),a("tbody",[a("tr",[a("td",[e._v("apply-sequence")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})])]),e._v(" "),a("tr",[a("td",[e._v("default-output-channel")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})])]),e._v(" "),a("tr",[a("td",[e._v("resolution-required")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})])]),e._v(" "),a("tr",[a("td",[e._v("ignore-send-failures")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})])]),e._v(" "),a("tr",[a("td",[e._v("timeout")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})])]),e._v(" "),a("tr",[a("td",[e._v("id")]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")]),e._v(" "),a("tr",[a("td",[e._v("auto-startup")]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")]),e._v(" "),a("tr",[a("td",[e._v("input-channel")]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")]),e._v(" "),a("tr",[a("td",[e._v("order")]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")]),e._v(" "),a("tr",[a("td",[e._v("method")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")]),e._v(" "),a("tr",[a("td",[e._v("ref")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")]),e._v(" "),a("tr",[a("td",[e._v("expression")]),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")]),e._v(" "),a("tr",[a("td",[e._v("header-name")]),e._v(" "),a("td"),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")]),e._v(" "),a("tr",[a("td",[e._v("evaluate-as-string")]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")]),e._v(" "),a("tr",[a("td",[e._v("xpath-expression-ref")]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")]),e._v(" "),a("tr",[a("td",[e._v("converter")]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td",[a("img",{attrs:{src:"https://docs.spring.io/spring-integration/docs/current/reference/html/images/tickmark.png",alt:"tickmark"}})]),e._v(" "),a("td"),e._v(" "),a("td"),e._v(" "),a("td")])])]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("As of Spring Integration 2.1, router parameters have been more standardized across all router implementations."),a("br"),e._v("Consequently, a few minor changes may break older Spring Integration based applications."),a("br"),a("br"),e._v("Since Spring Integration 2.1, the "),a("code",[e._v("ignore-channel-name-resolution-failures")]),e._v(" attribute is removed in favor of consolidating its behavior with the "),a("code",[e._v("resolution-required")]),e._v(" attribute."),a("br"),e._v("Also, the "),a("code",[e._v("resolution-required")]),e._v(" attribute now defaults to "),a("code",[e._v("true")]),e._v("."),a("br"),a("br"),e._v("Prior to these changes, the "),a("code",[e._v("resolution-required")]),e._v(" attribute defaulted to "),a("code",[e._v("false")]),e._v(", causing messages to be silently dropped when no channel was resolved and no "),a("code",[e._v("default-output-channel")]),e._v(" was set."),a("br"),e._v("The new behavior requires at least one resolved channel and, by default, throws a "),a("code",[e._v("MessageDeliveryException")]),e._v(" if no channel was determined (or an attempt to send was not successful)."),a("br"),a("br"),e._v("If you do desire to drop messages silently, you can set "),a("code",[e._v('default-output-channel="nullChannel"')]),e._v(".")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h4",{attrs:{id:"common-router-parameters"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#common-router-parameters"}},[e._v("#")]),e._v(" Common Router Parameters")]),e._v(" "),a("p",[e._v("This section describes the parameters common to all router parameters (the parameters with all their boxes ticked in the two tables shown earlier in this chapter).")]),e._v(" "),a("h5",{attrs:{id:"inside-and-outside-of-a-chain"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#inside-and-outside-of-a-chain"}},[e._v("#")]),e._v(" Inside and Outside of a Chain")]),e._v(" "),a("p",[e._v("The following parameters are valid for all routers inside and outside of chains.")]),e._v(" "),a("p",[a("code",[e._v("apply-sequence")])]),e._v(" "),a("p",[e._v("This attribute specifies whether sequence number and size headers should be added to each message.\nThis optional attribute defaults to "),a("code",[e._v("false")]),e._v(".")]),e._v(" "),a("p",[a("code",[e._v("default-output-channel")])]),e._v(" "),a("p",[e._v("If set, this attribute provides a reference to the channel where messages should be sent if channel resolution fails to return any channels.\nIf no default output channel is provided, the router throws an exception.\nIf you would like to silently drop those messages instead, set the default output channel attribute value to "),a("code",[e._v("nullChannel")]),e._v(".")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("A message is sent only to the "),a("code",[e._v("default-output-channel")]),e._v(" if "),a("code",[e._v("resolution-required")]),e._v(" is "),a("code",[e._v("false")]),e._v(" and the channel is not resolved.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[a("code",[e._v("resolution-required")])]),e._v(" "),a("p",[e._v("This attribute specifies whether channel names must always be successfully resolved to channel instances that exist.\nIf set to "),a("code",[e._v("true")]),e._v(", a "),a("code",[e._v("MessagingException")]),e._v(" is raised when the channel cannot be resolved.\nSetting this attribute to "),a("code",[e._v("false")]),e._v(" causes any unresovable channels to be ignored.\nThis optional attribute defaults to "),a("code",[e._v("true")]),e._v(".")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("A Message is sent only to the "),a("code",[e._v("default-output-channel")]),e._v(", if specified, when "),a("code",[e._v("resolution-required")]),e._v(" is "),a("code",[e._v("false")]),e._v(" and the channel is not resolved.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[a("code",[e._v("ignore-send-failures")])]),e._v(" "),a("p",[e._v("If set to "),a("code",[e._v("true")]),e._v(", failures to send to a message channel is ignored.\nIf set to "),a("code",[e._v("false")]),e._v(", a "),a("code",[e._v("MessageDeliveryException")]),e._v(" is thrown instead, and, if the router resolves more than one channel, any subsequent channels do not receive the message.")]),e._v(" "),a("p",[e._v("The exact behavior of this attribute depends on the type of the "),a("code",[e._v("Channel")]),e._v(" to which the messages are sent.\nFor example, when using direct channels (single threaded), send failures can be caused by exceptions thrown by components much further downstream.\nHowever, when sending messages to a simple queue channel (asynchronous), the likelihood of an exception to be thrown is rather remote.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("While most routers route to a single channel, they can return more than one channel name."),a("br"),e._v("The "),a("code",[e._v("recipient-list-router")]),e._v(", for instance, does exactly that."),a("br"),e._v("If you set this attribute to "),a("code",[e._v("true")]),e._v(" on a router that only routes to a single channel, any caused exception is swallowed, which usually makes little sense."),a("br"),e._v("In that case, it would be better to catch the exception in an error flow at the flow entry point."),a("br"),e._v("Therefore, setting the "),a("code",[e._v("ignore-send-failures")]),e._v(" attribute to "),a("code",[e._v("true")]),e._v(" usually makes more sense when the router implementation returns more than one channel name, because the other channel(s) following the one that fails would still receive the message.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("This attribute defaults to "),a("code",[e._v("false")]),e._v(".")]),e._v(" "),a("p",[a("code",[e._v("timeout")])]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("timeout")]),e._v(" attribute specifies the maximum amount of time in milliseconds to wait when sending messages to the target Message Channels.\nBy default, the send operation blocks indefinitely.")]),e._v(" "),a("h5",{attrs:{id:""}},[a("a",{staticClass:"header-anchor",attrs:{href:"#"}},[e._v("#")])]),e._v(" "),a("p",[e._v("The following parameters are valid only across all top-level routers that are outside of chains.")]),e._v(" "),a("p",[a("code",[e._v("id")])]),e._v(" "),a("p",[e._v("Identifies the underlying Spring bean definition, which, in the case of routers, is an instance of "),a("code",[e._v("EventDrivenConsumer")]),e._v(" or "),a("code",[e._v("PollingConsumer")]),e._v(", depending on whether the router’s "),a("code",[e._v("input-channel")]),e._v(" is a "),a("code",[e._v("SubscribableChannel")]),e._v(" or a "),a("code",[e._v("PollableChannel")]),e._v(", respectively.\nThis is an optional attribute.")]),e._v(" "),a("p",[a("code",[e._v("auto-startup")])]),e._v(" "),a("p",[e._v("This “lifecycle” attribute signaled whether this component should be started during startup of the application context.\nThis optional attribute defaults to "),a("code",[e._v("true")]),e._v(".")]),e._v(" "),a("p",[a("code",[e._v("input-channel")])]),e._v(" "),a("p",[e._v("The receiving message channel of this endpoint.")]),e._v(" "),a("p",[a("code",[e._v("order")])]),e._v(" "),a("p",[e._v("This attribute defines the order for invocation when this endpoint is connected as a subscriber to a channel.\nThis is particularly relevant when that channel uses a failover dispatching strategy.\nIt has no effect when this endpoint itself is a polling consumer for a channel with a queue.")]),e._v(" "),a("h4",{attrs:{id:"router-implementations"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#router-implementations"}},[e._v("#")]),e._v(" Router Implementations")]),e._v(" "),a("p",[e._v("Since content-based routing often requires some domain-specific logic, most use cases require Spring Integration’s options for delegating to POJOs by using either the XML namespace support or annotations.\nBoth of these are discussed later.\nHowever, we first present a couple of implementations that fulfill common requirements.")]),e._v(" "),a("h5",{attrs:{id:"payloadtyperouter"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#payloadtyperouter"}},[e._v("#")]),e._v(" "),a("code",[e._v("PayloadTypeRouter")])]),e._v(" "),a("p",[e._v("A "),a("code",[e._v("PayloadTypeRouter")]),e._v(" sends messages to the channel defined by payload-type mappings, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n \n \n \n \n \n\n')])])]),a("p",[e._v("Configuration of the "),a("code",[e._v("PayloadTypeRouter")]),e._v(" is also supported by the namespace provided by Spring Integration (see "),a("code",[e._v("[Namespace Support](./configuration.html#configuration-namespace)")]),e._v("), which essentially simplifies configuration by combining the "),a("code",[e._v("")]),e._v(" configuration and its corresponding implementation (defined by using a "),a("code",[e._v("")]),e._v(" element) into a single and more concise configuration element.\nThe following example shows a "),a("code",[e._v("PayloadTypeRouter")]),e._v(" configuration that is equivalent to the one above but uses the namespace support:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n \n\n')])])]),a("p",[e._v("The following example shows the equivalent router configured in Java:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@ServiceActivator(inputChannel = "routingChannel")\n@Bean\npublic PayloadTypeRouter router() {\n PayloadTypeRouter router = new PayloadTypeRouter();\n router.setChannelMapping(String.class.getName(), "stringChannel");\n router.setChannelMapping(Integer.class.getName(), "integerChannel");\n return router;\n}\n')])])]),a("p",[e._v("When using the Java DSL, there are two options.")]),e._v(" "),a("p",[e._v("First, you can define the router object as shown in the preceding example:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Bean\npublic IntegrationFlow routerFlow1() {\n return IntegrationFlows.from("routingChannel")\n .route(router())\n .get();\n}\n\npublic PayloadTypeRouter router() {\n PayloadTypeRouter router = new PayloadTypeRouter();\n router.setChannelMapping(String.class.getName(), "stringChannel");\n router.setChannelMapping(Integer.class.getName(), "integerChannel");\n return router;\n}\n')])])]),a("p",[e._v("Note that the router can be, but does not have to be, a "),a("code",[e._v("@Bean")]),e._v(".\nThe flow registers it if it is not a "),a("code",[e._v("@Bean")]),e._v(".")]),e._v(" "),a("p",[e._v("Second, you can define the routing function within the DSL flow itself, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Bean\npublic IntegrationFlow routerFlow2() {\n return IntegrationFlows.from("routingChannel")\n .>route(Object::getClass, m -> m\n .channelMapping(String.class, "stringChannel")\n .channelMapping(Integer.class, "integerChannel"))\n .get();\n}\n')])])]),a("h5",{attrs:{id:"headervaluerouter"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#headervaluerouter"}},[e._v("#")]),e._v(" "),a("code",[e._v("HeaderValueRouter")])]),e._v(" "),a("p",[e._v("A "),a("code",[e._v("HeaderValueRouter")]),e._v(" sends Messages to the channel based on the individual header value mappings.\nWhen a "),a("code",[e._v("HeaderValueRouter")]),e._v(" is created, it is initialized with the name of the header to be evaluated.\nThe value of the header could be one of two things:")]),e._v(" "),a("ul",[a("li",[a("p",[e._v("An arbitrary value")])]),e._v(" "),a("li",[a("p",[e._v("A channel name")])])]),e._v(" "),a("p",[e._v("If it is an arbitrary value, additional mappings for these header values to channel names are required.\nOtherwise, no additional configuration is needed.")]),e._v(" "),a("p",[e._v("Spring Integration provides a simple namespace-based XML configuration to configure a "),a("code",[e._v("HeaderValueRouter")]),e._v(".\nThe following example demonstrates configuration for the "),a("code",[e._v("HeaderValueRouter")]),e._v(" when mapping of header values to channels is required:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n \n\n')])])]),a("p",[e._v("During the resolution process, the router defined in the preceding example may encounter channel resolution failures, causing an exception.\nIf you want to suppress such exceptions and send unresolved messages to the default output channel (identified with the "),a("code",[e._v("default-output-channel")]),e._v(" attribute) set "),a("code",[e._v("resolution-required")]),e._v(" to "),a("code",[e._v("false")]),e._v(".")]),e._v(" "),a("p",[e._v("Normally, messages for which the header value is not explicitly mapped to a channel are sent to the "),a("code",[e._v("default-output-channel")]),e._v(".\nHowever, when the header value is mapped to a channel name but the channel cannot be resolved, setting the "),a("code",[e._v("resolution-required")]),e._v(" attribute to "),a("code",[e._v("false")]),e._v(" results in routing such messages to the "),a("code",[e._v("default-output-channel")]),e._v(".")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("As of Spring Integration 2.1, the attribute was changed from "),a("code",[e._v("ignore-channel-name-resolution-failures")]),e._v(" to "),a("code",[e._v("resolution-required")]),e._v("."),a("br"),e._v("Attribute "),a("code",[e._v("resolution-required")]),e._v(" defaults to "),a("code",[e._v("true")]),e._v(".")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("The following example shows the equivalent router configured in Java:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@ServiceActivator(inputChannel = "routingChannel")\n@Bean\npublic HeaderValueRouter router() {\n HeaderValueRouter router = new HeaderValueRouter("testHeader");\n router.setChannelMapping("someHeaderValue", "channelA");\n router.setChannelMapping("someOtherHeaderValue", "channelB");\n return router;\n}\n')])])]),a("p",[e._v("When using the Java DSL, there are two options.\nFirst, you can define the router object as shown in the preceding example:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Bean\npublic IntegrationFlow routerFlow1() {\n return IntegrationFlows.from("routingChannel")\n .route(router())\n .get();\n}\n\npublic HeaderValueRouter router() {\n HeaderValueRouter router = new HeaderValueRouter("testHeader");\n router.setChannelMapping("someHeaderValue", "channelA");\n router.setChannelMapping("someOtherHeaderValue", "channelB");\n return router;\n}\n')])])]),a("p",[e._v("Note that the router can be, but does not have to be, a "),a("code",[e._v("@Bean")]),e._v(".\nThe flow registers it if it is not a "),a("code",[e._v("@Bean")]),e._v(".")]),e._v(" "),a("p",[e._v("Second, you can define the routing function within the DSL flow itself, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Bean\npublic IntegrationFlow routerFlow2() {\n return IntegrationFlows.from("routingChannel")\n .route(Message.class, m -> m.getHeaders().get("testHeader", String.class),\n m -> m\n .channelMapping("someHeaderValue", "channelA")\n .channelMapping("someOtherHeaderValue", "channelB"),\n e -> e.id("headerValueRouter"))\n .get();\n}\n')])])]),a("p",[e._v("Configuration where mapping of header values to channel names is not required, because header values themselves represent channel names.\nThe following example shows a router that does not require mapping of header values to channel names:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n')])])]),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Since Spring Integration 2.1, the behavior of resolving channels is more explicit."),a("br"),e._v("For example, if you omit the "),a("code",[e._v("default-output-channel")]),e._v(" attribute, the router was unable to resolve at least one valid channel, and any channel name resolution failures were ignored by setting "),a("code",[e._v("resolution-required")]),e._v(" to "),a("code",[e._v("false")]),e._v(", then a "),a("code",[e._v("MessageDeliveryException")]),e._v(" is thrown."),a("br"),a("br"),e._v("Basically, by default, the router must be able to route messages successfully to at least one channel."),a("br"),e._v("If you really want to drop messages, you must also have "),a("code",[e._v("default-output-channel")]),e._v(" set to "),a("code",[e._v("nullChannel")]),e._v(".")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h5",{attrs:{id:"recipientlistrouter"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#recipientlistrouter"}},[e._v("#")]),e._v(" "),a("code",[e._v("RecipientListRouter")])]),e._v(" "),a("p",[e._v("A "),a("code",[e._v("RecipientListRouter")]),e._v(" sends each received message to a statically defined list of message channels.\nThe following example creates a "),a("code",[e._v("RecipientListRouter")]),e._v(":")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n \n \n \n \n \n \n\n')])])]),a("p",[e._v("Spring Integration also provides namespace support for the "),a("code",[e._v("RecipientListRouter")]),e._v(" configuration (see "),a("RouterLink",{attrs:{to:"/en/spring-integration/configuration.html#configuration-namespace"}},[e._v("Namespace Support")]),e._v(") as the following example shows:")],1),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n \n\n')])])]),a("p",[e._v("The following example shows the equivalent router configured in Java:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@ServiceActivator(inputChannel = "routingChannel")\n@Bean\npublic RecipientListRouter router() {\n RecipientListRouter router = new RecipientListRouter();\n router.setSendTimeout(1_234L);\n router.setIgnoreSendFailures(true);\n router.setApplySequence(true);\n router.addRecipient("channel1");\n router.addRecipient("channel2");\n router.addRecipient("channel3");\n return router;\n}\n')])])]),a("p",[e._v("The following example shows the equivalent router configured by using the Java DSL:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Bean\npublic IntegrationFlow routerFlow() {\n return IntegrationFlows.from("routingChannel")\n .routeToRecipients(r -> r\n .applySequence(true)\n .ignoreSendFailures(true)\n .recipient("channel1")\n .recipient("channel2")\n .recipient("channel3")\n .sendTimeout(1_234L))\n .get();\n}\n')])])]),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("The 'apply-sequence' flag here has the same effect as it does for a publish-subscribe-channel, and, as with a publish-subscribe-channel, it is disabled by default on the "),a("code",[e._v("recipient-list-router")]),e._v("."),a("br"),e._v("See "),a("RouterLink",{attrs:{to:"/en/spring-integration/channel.html#channel-configuration-pubsubchannel"}},[a("code",[e._v("PublishSubscribeChannel")]),e._v(" Configuration")]),e._v(" for more information.")],1)])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("Another convenient option when configuring a "),a("code",[e._v("RecipientListRouter")]),e._v(" is to use Spring Expression Language (SpEL) support as selectors for individual recipient channels.\nDoing so is similar to using a filter at the beginning of a 'chain' to act as a “selective consumer”.\nHowever, in this case, it is all combined rather concisely into the router’s configuration, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n \n\n')])])]),a("p",[e._v("In the preceding configuration, a SpEL expression identified by the "),a("code",[e._v("selector-expression")]),e._v(" attribute is evaluated to determine whether this recipient should be included in the recipient list for a given input message.\nThe evaluation result of the expression must be a boolean.\nIf this attribute is not defined, the channel is always among the list of recipients.")]),e._v(" "),a("h5",{attrs:{id:"recipientlistroutermanagement"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#recipientlistroutermanagement"}},[e._v("#")]),e._v(" "),a("code",[e._v("RecipientListRouterManagement")])]),e._v(" "),a("p",[e._v("Starting with version 4.1, the "),a("code",[e._v("RecipientListRouter")]),e._v(" provides several operations to manipulate recipients dynamically at runtime.\nThese management operations are presented by "),a("code",[e._v("RecipientListRouterManagement")]),e._v(" through the "),a("code",[e._v("@ManagedResource")]),e._v(" annotation.\nThey are available by using "),a("RouterLink",{attrs:{to:"/en/spring-integration/control-bus.html#control-bus"}},[e._v("Control Bus")]),e._v(" as well as by using JMX, as the following example shows:")],1),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n\n\n \n\n\n\n')])])]),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("messagingTemplate.convertAndSend(controlBus, \"@'simpleRouter.handler'.addRecipient('channel2')\");\n")])])]),a("p",[e._v("From the application start up the "),a("code",[e._v("simpleRouter")]),e._v(", has only one "),a("code",[e._v("channel1")]),e._v(" recipient.\nBut after the "),a("code",[e._v("addRecipient")]),e._v(" command, "),a("code",[e._v("channel2")]),e._v(" recipient is added.\nIt is a “registering an interest in something that is part of the message” use case, when we may be interested in messages from the router at some time period, so we are subscribing to the "),a("code",[e._v("recipient-list-router")]),e._v(" and, at some point, decide to unsubscribe.")]),e._v(" "),a("p",[e._v("Because of the runtime management operation for the "),a("code",[e._v("")]),e._v(", it can be configured without any "),a("code",[e._v("")]),e._v(" from the start.\nIn this case, the behavior of "),a("code",[e._v("RecipientListRouter")]),e._v(" is the same when there is no one matching recipient for the message.\nIf "),a("code",[e._v("defaultOutputChannel")]),e._v(" is configured, the message is sent there.\nOtherwise the "),a("code",[e._v("MessageDeliveryException")]),e._v(" is thrown.")]),e._v(" "),a("h5",{attrs:{id:"xpath-router"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#xpath-router"}},[e._v("#")]),e._v(" XPath Router")]),e._v(" "),a("p",[e._v("The XPath Router is part of the XML Module.\nSee "),a("RouterLink",{attrs:{to:"/en/spring-integration/xml.html#xml-xpath-routing"}},[e._v("Routing XML Messages with XPath")]),e._v(".")],1),e._v(" "),a("h5",{attrs:{id:"routing-and-error-handling"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#routing-and-error-handling"}},[e._v("#")]),e._v(" Routing and Error Handling")]),e._v(" "),a("p",[e._v("Spring Integration also provides a special type-based router called "),a("code",[e._v("ErrorMessageExceptionTypeRouter")]),e._v(" for routing error messages (defined as messages whose "),a("code",[e._v("payload")]),e._v(" is a "),a("code",[e._v("Throwable")]),e._v(" instance)."),a("code",[e._v("ErrorMessageExceptionTypeRouter")]),e._v(" is similar to the "),a("code",[e._v("PayloadTypeRouter")]),e._v(".\nIn fact, they are almost identical.\nThe only difference is that, while "),a("code",[e._v("PayloadTypeRouter")]),e._v(" navigates the instance hierarchy of a payload instance (for example, "),a("code",[e._v("payload.getClass().getSuperclass()")]),e._v(") to find the most specific type and channel mappings, the "),a("code",[e._v("ErrorMessageExceptionTypeRouter")]),e._v(" navigates the hierarchy of 'exception causes' (for example, "),a("code",[e._v("payload.getCause()")]),e._v(") to find the most specific "),a("code",[e._v("Throwable")]),e._v(" type or channel mappings and uses "),a("code",[e._v("mappingClass.isInstance(cause)")]),e._v(" to match the "),a("code",[e._v("cause")]),e._v(" to the class or any super class.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("The channel mapping order in this case matters."),a("br"),e._v("So, if there is a requirement to get mapping for an "),a("code",[e._v("IllegalArgumentException")]),e._v(", but not a "),a("code",[e._v("RuntimeException")]),e._v(", the last one must be configured on router first.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Since version 4.3 the "),a("code",[e._v("ErrorMessageExceptionTypeRouter")]),e._v(" loads all mapping classes during the initialization phase to fail-fast for a "),a("code",[e._v("ClassNotFoundException")]),e._v(".")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("The following example shows a sample configuration for "),a("code",[e._v("ErrorMessageExceptionTypeRouter")]),e._v(":")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n \n\n\n\n\n')])])]),a("h4",{attrs:{id:"configuring-a-generic-router"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#configuring-a-generic-router"}},[e._v("#")]),e._v(" Configuring a Generic Router")]),e._v(" "),a("p",[e._v("Spring Integration provides a generic router.\nYou can use it for general-purpose routing (as opposed to the other routers provided by Spring Integration, each of which has some form of specialization).")]),e._v(" "),a("h5",{attrs:{id:"configuring-a-content-based-router-with-xml"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#configuring-a-content-based-router-with-xml"}},[e._v("#")]),e._v(" Configuring a Content-based Router with XML")]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("router")]),e._v(" element provides a way to connect a router to an input channel and also accepts the optional "),a("code",[e._v("default-output-channel")]),e._v(" attribute.\nThe "),a("code",[e._v("ref")]),e._v(" attribute references the bean name of a custom router implementation (which must extend "),a("code",[e._v("AbstractMessageRouter")]),e._v(").\nThe following example shows three generic routers:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n\n\n\n\n\n\n')])])]),a("p",[e._v("Alternatively, "),a("code",[e._v("ref")]),e._v(" may point to a POJO that contains the "),a("code",[e._v("@Router")]),e._v(" annotation (shown later), or you can combine the "),a("code",[e._v("ref")]),e._v(" with an explicit method name.\nSpecifying a method applies the same behavior described in the "),a("code",[e._v("@Router")]),e._v(" annotation section, later in this document.\nThe following example defines a router that points to a POJO in its "),a("code",[e._v("ref")]),e._v(" attribute:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n')])])]),a("p",[e._v("We generally recommend using a "),a("code",[e._v("ref")]),e._v(" attribute if the custom router implementation is referenced in other "),a("code",[e._v("")]),e._v(" definitions.\nHowever if the custom router implementation should be scoped to a single definition of the "),a("code",[e._v("")]),e._v(", you can provide an inner bean definition, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n\n')])])]),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Using both the "),a("code",[e._v("ref")]),e._v(" attribute and an inner handler definition in the same "),a("code",[e._v("")]),e._v(" configuration is not allowed."),a("br"),e._v("Doing so creates an ambiguous condition and throws an exception.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("If the "),a("code",[e._v("ref")]),e._v(" attribute references a bean that extends "),a("code",[e._v("AbstractMessageProducingHandler")]),e._v(" (such as routers provided by the framework itself), the configuration is optimized to reference the router directly."),a("br"),e._v("In this case, each "),a("code",[e._v("ref")]),e._v(" attribute must refer to a separate bean instance (or a "),a("code",[e._v("prototype")]),e._v("-scoped bean) or use the inner "),a("code",[e._v("")]),e._v(" configuration type."),a("br"),e._v("However, this optimization applies only if you do not provide any router-specific attributes in the router XML definition."),a("br"),e._v("If you inadvertently reference the same message handler from multiple beans, you get a configuration exception.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("The following example shows the equivalent router configured in Java:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Bean\n@Router(inputChannel = "routingChannel")\npublic AbstractMessageRouter myCustomRouter() {\n return new AbstractMessageRouter() {\n\n @Override\n protected Collection determineTargetChannels(Message message) {\n return // determine channel(s) for message\n }\n\n };\n}\n')])])]),a("p",[e._v("The following example shows the equivalent router configured by using the Java DSL:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Bean\npublic IntegrationFlow routerFlow() {\n return IntegrationFlows.from("routingChannel")\n .route(myCustomRouter())\n .get();\n}\n\npublic AbstractMessageRouter myCustomRouter() {\n return new AbstractMessageRouter() {\n\n @Override\n protected Collection determineTargetChannels(Message message) {\n return // determine channel(s) for message\n }\n\n };\n}\n')])])]),a("p",[e._v("Alternately, you can route on data from the message payload, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Bean\npublic IntegrationFlow routerFlow() {\n return IntegrationFlows.from("routingChannel")\n .route(String.class, p -> p.contains("foo") ? "fooChannel" : "barChannel")\n .get();\n}\n')])])]),a("h4",{attrs:{id:"-2"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#-2"}},[e._v("#")])]),e._v(" "),a("p",[e._v("Sometimes, the routing logic may be simple, and writing a separate class for it and configuring it as a bean may seem like overkill.\nAs of Spring Integration 2.0, we offer an alternative that lets you use SpEL to implement simple computations that previously required a custom POJO router.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("For more information about the Spring Expression Language, see the "),a("a",{attrs:{href:"https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#expressions",target:"_blank",rel:"noopener noreferrer"}},[e._v("relevant chapter in the Spring Framework Reference Guide"),a("OutboundLink")],1),e._v(".")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("Generally, a SpEL expression is evaluated and its result is mapped to a channel, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n \n \n\n')])])]),a("p",[e._v("The following example shows the equivalent router configured in Java:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Router(inputChannel = "routingChannel")\n@Bean\npublic ExpressionEvaluatingRouter router() {\n ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter("payload.paymentType");\n router.setChannelMapping("CASH", "cashPaymentChannel");\n router.setChannelMapping("CREDIT", "authorizePaymentChannel");\n router.setChannelMapping("DEBIT", "authorizePaymentChannel");\n return router;\n}\n')])])]),a("p",[e._v("The following example shows the equivalent router configured in the Java DSL:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Bean\npublic IntegrationFlow routerFlow() {\n return IntegrationFlows.from("routingChannel")\n .route("payload.paymentType", r -> r\n .channelMapping("CASH", "cashPaymentChannel")\n .channelMapping("CREDIT", "authorizePaymentChannel")\n .channelMapping("DEBIT", "authorizePaymentChannel"))\n .get();\n}\n')])])]),a("p",[e._v("To simplify things even more, the SpEL expression may evaluate to a channel name, as the following expression shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n')])])]),a("p",[e._v("In the preceding configuration, the result channel is computed by the SpEL expression, which concatenates the value of the "),a("code",[e._v("payload")]),e._v(" with the literal "),a("code",[e._v("String")]),e._v(", 'Channel'.")]),e._v(" "),a("p",[e._v("Another virtue of SpEL for configuring routers is that an expression can return a "),a("code",[e._v("Collection")]),e._v(", effectively making every "),a("code",[e._v("")]),e._v(" a recipient list router.\nWhenever the expression returns multiple channel values, the message is forwarded to each channel.\nThe following example shows such an expression:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n')])])]),a("p",[e._v("In the above configuration, if the message includes a header with a name of 'channels' and the value of that header is a "),a("code",[e._v("List")]),e._v(" of channel names, the message is sent to each channel in the list.\nYou may also find collection projection and collection selection expressions useful when you need to select multiple channels.\nFor further information, see:")]),e._v(" "),a("ul",[a("li",[a("p",[a("a",{attrs:{href:"https://docs.spring.io/spring-framework/docs/current/spring-framework-reference/core.html#expressions-collection-projection",target:"_blank",rel:"noopener noreferrer"}},[e._v("Collection Projection"),a("OutboundLink")],1)])]),e._v(" "),a("li",[a("p",[a("a",{attrs:{href:"https://docs.spring.io/spring-framework/docs/current/spring-framework-reference/core.html#expressions-collection-selection",target:"_blank",rel:"noopener noreferrer"}},[e._v("Collection Selection"),a("OutboundLink")],1)])])]),e._v(" "),a("h5",{attrs:{id:"configuring-a-router-with-annotations"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#configuring-a-router-with-annotations"}},[e._v("#")]),e._v(" Configuring a Router with Annotations")]),e._v(" "),a("p",[e._v("When using "),a("code",[e._v("@Router")]),e._v(" to annotate a method, the method may return either a "),a("code",[e._v("MessageChannel")]),e._v(" or a "),a("code",[e._v("String")]),e._v(" type.\nIn the latter case, the endpoint resolves the channel name as it does for the default output channel.\nAdditionally, the method may return either a single value or a collection.\nIf a collection is returned, the reply message is sent to multiple channels.\nTo summarize, the following method signatures are all valid:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("@Router\npublic MessageChannel route(Message message) {...}\n\n@Router\npublic List route(Message message) {...}\n\n@Router\npublic String route(Foo payload) {...}\n\n@Router\npublic List route(Foo payload) {...}\n")])])]),a("p",[e._v("In addition to payload-based routing, a message may be routed based on metadata available within the message header as either a property or an attribute.\nIn this case, a method annotated with "),a("code",[e._v("@Router")]),e._v(" may include a parameter annotated with "),a("code",[e._v("@Header")]),e._v(", which is mapped to a header value as the following example shows and documented in "),a("RouterLink",{attrs:{to:"/en/spring-integration/configuration.html#annotations"}},[e._v("Annotation Support")]),e._v(":")],1),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Router\npublic List route(@Header("orderStatus") OrderStatus status)\n')])])]),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("For routing of XML-based Messages, including XPath support, see "),a("RouterLink",{attrs:{to:"/en/spring-integration/xml.html#xml"}},[e._v("XML Support - Dealing with XML Payloads")]),e._v(".")],1)])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("See also "),a("RouterLink",{attrs:{to:"/en/spring-integration/dsl.html#java-dsl-routers"}},[e._v("Message Routers")]),e._v(" in the Java DSL chapter for more information about router configuration.")],1),e._v(" "),a("h4",{attrs:{id:"dynamic-routers"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#dynamic-routers"}},[e._v("#")]),e._v(" Dynamic Routers")]),e._v(" "),a("p",[e._v("Spring Integration provides quite a few different router configurations for common content-based routing use cases as well as the option of implementing custom routers as POJOs.\nFor example, "),a("code",[e._v("PayloadTypeRouter")]),e._v(" provides a simple way to configure a router that computes channels based on the payload type of the incoming message while "),a("code",[e._v("HeaderValueRouter")]),e._v(" provides the same convenience in configuring a router that computes channels by evaluating the value of a particular message Header.\nThere are also expression-based (SpEL) routers, in which the channel is determined based on evaluating an expression.\nAll of these type of routers exhibit some dynamic characteristics.")]),e._v(" "),a("p",[e._v("However, these routers all require static configuration.\nEven in the case of expression-based routers, the expression itself is defined as part of the router configuration, which means that the same expression operating on the same value always results in the computation of the same channel.\nThis is acceptable in most cases, since such routes are well defined and therefore predictable.\nBut there are times when we need to change router configurations dynamically so that message flows may be routed to a different channel.")]),e._v(" "),a("p",[e._v("For example, you might want to bring down some part of your system for maintenance and temporarily re-reroute messages to a different message flow.\nAs another example, you may want to introduce more granularity to your message flow by adding another route to handle a more concrete type of "),a("code",[e._v("java.lang.Number")]),e._v(" (in the case of "),a("code",[e._v("PayloadTypeRouter")]),e._v(").")]),e._v(" "),a("p",[e._v("Unfortunately, with static router configuration to accomplish either of those goals, you would have to bring down your entire application, change the configuration of the router (change routes), and bring the application back up.\nThis is obviously not a solution anyone wants.")]),e._v(" "),a("p",[e._v("The "),a("a",{attrs:{href:"https://www.enterpriseintegrationpatterns.com/DynamicRouter.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("dynamic router"),a("OutboundLink")],1),e._v(" pattern describes the mechanisms by which you can change or configure routers dynamically without bringing down the system or individual routers.")]),e._v(" "),a("p",[e._v("Before we get into the specifics of how Spring Integration supports dynamic routing, we need to consider the typical flow of a router:")]),e._v(" "),a("ol",[a("li",[a("p",[e._v("Compute a channel identifier, which is a value calculated by the router once it receives the message.\nTypically, it is a String or an instance of the actual "),a("code",[e._v("MessageChannel")]),e._v(".")])]),e._v(" "),a("li",[a("p",[e._v("Resolve the channel identifier to a channel name.\nWe describe specifics of this process later in this section.")])]),e._v(" "),a("li",[a("p",[e._v("Resolve the channel name to the actual "),a("code",[e._v("MessageChannel")])])])]),e._v(" "),a("p",[e._v("There is not much that can be done with regard to dynamic routing if Step 1 results in the actual instance of the "),a("code",[e._v("MessageChannel")]),e._v(", because the "),a("code",[e._v("MessageChannel")]),e._v(" is the final product of any router’s job.\nHowever, if the first step results in a channel identifier that is not an instance of "),a("code",[e._v("MessageChannel")]),e._v(", you have quite a few possible ways to influence the process of deriving the "),a("code",[e._v("MessageChannel")]),e._v(".\nConsider the following example of a payload type router:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n \n\n')])])]),a("p",[e._v("Within the context of a payload type router, the three steps mentioned earlier would be realized as follows:")]),e._v(" "),a("ol",[a("li",[a("p",[e._v("Compute a channel identifier that is the fully qualified name of the payload type (for example, "),a("code",[e._v("java.lang.String")]),e._v(").")])]),e._v(" "),a("li",[a("p",[e._v("Resolve the channel identifier to a channel name, where the result of the previous step is used to select the appropriate value from the payload type mapping defined in the "),a("code",[e._v("mapping")]),e._v(" element.")])]),e._v(" "),a("li",[a("p",[e._v("Resolve the channel name to the actual instance of the "),a("code",[e._v("MessageChannel")]),e._v(" as a reference to a bean within the application context (which is hopefully a "),a("code",[e._v("MessageChannel")]),e._v(") identified by the result of the previous step.")])])]),e._v(" "),a("p",[e._v("In other words, each step feeds the next step until the process completes.")]),e._v(" "),a("p",[e._v("Now consider an example of a header value router:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n \n\n')])])]),a("p",[e._v("Now we can consider how the three steps work for a header value router:")]),e._v(" "),a("ol",[a("li",[a("p",[e._v("Compute a channel identifier that is the value of the header identified by the "),a("code",[e._v("header-name")]),e._v(" attribute.")])]),e._v(" "),a("li",[a("p",[e._v("Resolve the channel identifier a to channel name, where the result of the previous step is used to select the appropriate value from the general mapping defined in the "),a("code",[e._v("mapping")]),e._v(" element.")])]),e._v(" "),a("li",[a("p",[e._v("Resolve the channel name to the actual instance of the "),a("code",[e._v("MessageChannel")]),e._v(" as a reference to a bean within the application context (which is hopefully a "),a("code",[e._v("MessageChannel")]),e._v(") identified by the result of the previous step.")])])]),e._v(" "),a("p",[e._v("The preceding two configurations of two different router types look almost identical.\nHowever, if you look at the alternate configuration of the "),a("code",[e._v("HeaderValueRouter")]),e._v(" we clearly see that there is no "),a("code",[e._v("mapping")]),e._v(" sub element, as the following listing shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n')])])]),a("p",[e._v("However, the configuration is still perfectly valid.\nSo the natural question is what about the mapping in the second step?")]),e._v(" "),a("p",[e._v("The second step is now optional.\nIf "),a("code",[e._v("mapping")]),e._v(" is not defined, then the channel identifier value computed in the first step is automatically treated as the "),a("code",[e._v("channel name")]),e._v(", which is now resolved to the actual "),a("code",[e._v("MessageChannel")]),e._v(", as in the third step.\nWhat it also means is that the second step is one of the key steps to providing dynamic characteristics to the routers, since it introduces a process that lets you change the way channel identifier resolves to the channel name, thus influencing the process of determining the final instance of the "),a("code",[e._v("MessageChannel")]),e._v(" from the initial channel identifier.")]),e._v(" "),a("p",[e._v("For example, in the preceding configuration, assume that the "),a("code",[e._v("testHeader")]),e._v(" value is 'kermit', which is now a channel identifier (the first step).\nSince there is no mapping in this router, resolving this channel identifier to a channel name (the second step) is impossible and this channel identifier is now treated as the channel name.\nHowever, what if there was a mapping but for a different value?\nThe end result would still be the same, because, if a new value cannot be determined through the process of resolving the channel identifier to a channel name, the channel identifier becomes the channel name.")]),e._v(" "),a("p",[e._v("All that is left is for the third step to resolve the channel name ('kermit') to an actual instance of the "),a("code",[e._v("MessageChannel")]),e._v(" identified by this name.\nThat basically involves a bean lookup for the provided name.\nNow all messages that contain the header-value pair as "),a("code",[e._v("testHeader=kermit")]),e._v(" are going to be routed to a "),a("code",[e._v("MessageChannel")]),e._v(" whose bean name (its "),a("code",[e._v("id")]),e._v(") is 'kermit'.")]),e._v(" "),a("p",[e._v("But what if you want to route these messages to the 'simpson' channel? Obviously changing a static configuration works, but doing so also requires bringing your system down.\nHowever, if you had access to the channel identifier map, you could introduce a new mapping where the header-value pair is now "),a("code",[e._v("kermit=simpson")]),e._v(", thus letting the second step treat 'kermit' as a channel identifier while resolving it to 'simpson' as the channel name.")]),e._v(" "),a("p",[e._v("The same obviously applies for "),a("code",[e._v("PayloadTypeRouter")]),e._v(", where you can now remap or remove a particular payload type mapping.\nIn fact, it applies to every other router, including expression-based routers, since their computed values now have a chance to go through the second step to be resolved to the actual "),a("code",[e._v("channel name")]),e._v(".")]),e._v(" "),a("p",[e._v("Any router that is a subclass of the "),a("code",[e._v("AbstractMappingMessageRouter")]),e._v(" (which includes most framework-defined routers) is a dynamic router, because the "),a("code",[e._v("channelMapping")]),e._v(" is defined at the "),a("code",[e._v("AbstractMappingMessageRouter")]),e._v(" level.\nThat map’s setter method is exposed as a public method along with the 'setChannelMapping' and 'removeChannelMapping' methods.\nThese let you change, add, and remove router mappings at runtime, as long as you have a reference to the router itself.\nIt also means that you could expose these same configuration options through JMX (see "),a("RouterLink",{attrs:{to:"/en/spring-integration/jmx.html#jmx"}},[e._v("JMX Support")]),e._v(") or the Spring Integration control bus (see "),a("RouterLink",{attrs:{to:"/en/spring-integration/control-bus.html#control-bus"}},[e._v("Control Bus")]),e._v(") functionality.")],1),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Falling back to the channel key as the channel name is flexible and convenient."),a("br"),e._v("However, if you don’t trust the message creator, a malicious actor (who has knowledge of the system) could create a message that is routed to an unexpected channel."),a("br"),e._v("For example, if the key is set to the channel name of the router’s input channel, such a message would be routed back to the router, eventually resulting in a stack overflow error."),a("br"),e._v("You may therefore wish to disable this feature (set the "),a("code",[e._v("channelKeyFallback")]),e._v(" property to "),a("code",[e._v("false")]),e._v("), and change the mappings instead if needed.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h5",{attrs:{id:"manage-router-mappings-using-the-control-bus"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#manage-router-mappings-using-the-control-bus"}},[e._v("#")]),e._v(" Manage Router Mappings using the Control Bus")]),e._v(" "),a("p",[e._v("One way to manage the router mappings is through the "),a("a",{attrs:{href:"https://www.enterpriseintegrationpatterns.com/ControlBus.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("control bus"),a("OutboundLink")],1),e._v(" pattern, which exposes a control channel to which you can send control messages to manage and monitor Spring Integration components, including routers.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("For more information about the control bus, see "),a("RouterLink",{attrs:{to:"/en/spring-integration/control-bus.html#control-bus"}},[e._v("Control Bus")]),e._v(".")],1)])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("Typically, you would send a control message asking to invoke a particular operation on a particular managed component (such as a\nrouter).\nThe following managed operations (methods) are specific to changing the router resolution process:")]),e._v(" "),a("ul",[a("li",[a("p",[a("code",[e._v("public void setChannelMapping(String key, String channelName)")]),e._v(": Lets you add a new or modify an existing mapping between "),a("code",[e._v("channel identifier")]),e._v(" and "),a("code",[e._v("channel name")])])]),e._v(" "),a("li",[a("p",[a("code",[e._v("public void removeChannelMapping(String key)")]),e._v(": Lets you remove a particular channel mapping, thus disconnecting the relationship between "),a("code",[e._v("channel identifier")]),e._v(" and "),a("code",[e._v("channel name")])])])]),e._v(" "),a("p",[e._v("Note that these methods can be used for simple changes (such as updating a single route or adding or removing a route).\nHowever, if you want to remove one route and add another, the updates are not atomic.\nThis means that the routing table may be in an indeterminate state between the updates.\nStarting with version 4.0, you can now use the control bus to update the entire routing table atomically.\nThe following methods let you do so:")]),e._v(" "),a("ul",[a("li",[a("p",[a("code",[e._v("public MapgetChannelMappings()")]),e._v(": Returns the current mappings.")])]),e._v(" "),a("li",[a("p",[a("code",[e._v("public void replaceChannelMappings(Properties channelMappings)")]),e._v(": Updates the mappings.\nNote that the "),a("code",[e._v("channelMappings")]),e._v(" parameter is a "),a("code",[e._v("Properties")]),e._v(" object.\nThis arrangement lets a control bus command use the built-in "),a("code",[e._v("StringToPropertiesConverter")]),e._v(", as the following example shows:")])])]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("\"@'router.handler'.replaceChannelMappings('foo=qux \\n baz=bar')\"\n")])])]),a("p",[e._v("Note that each mapping is separated by a newline character ("),a("code",[e._v("\\n")]),e._v(").\nFor programmatic changes to the map, we recommend that you use the "),a("code",[e._v("setChannelMappings")]),e._v(" method, due to type-safety concerns."),a("code",[e._v("replaceChannelMappings")]),e._v(" ignores keys or values that are not "),a("code",[e._v("String")]),e._v(" objects.")]),e._v(" "),a("h5",{attrs:{id:"manage-router-mappings-by-using-jmx"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#manage-router-mappings-by-using-jmx"}},[e._v("#")]),e._v(" Manage Router Mappings by Using JMX")]),e._v(" "),a("p",[e._v("You can also use Spring’s JMX support to expose a router instance and then use your favorite JMX client (for example, JConsole) to manage those operations (methods) for changing the router’s configuration.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("For more information about Spring Integration’s JMX support, see "),a("RouterLink",{attrs:{to:"/en/spring-integration/jmx.html#jmx"}},[e._v("JMX Support")]),e._v(".")],1)])]),e._v(" "),a("tbody")]),e._v(" "),a("h5",{attrs:{id:"routing-slip"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#routing-slip"}},[e._v("#")]),e._v(" Routing Slip")]),e._v(" "),a("p",[e._v("Starting with version 4.1, Spring Integration provides an implementation of the "),a("a",{attrs:{href:"https://www.enterpriseintegrationpatterns.com/RoutingTable.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("routing slip"),a("OutboundLink")],1),e._v(" enterprise integration pattern.\nIt is implemented as a "),a("code",[e._v("routingSlip")]),e._v(" message header, which is used to determine the next channel in "),a("code",[e._v("AbstractMessageProducingHandler")]),e._v(" instances, when an "),a("code",[e._v("outputChannel")]),e._v(" is not specified for the endpoint.\nThis pattern is useful in complex, dynamic cases, when it can become difficult to configure multiple routers to determine message flow.\nWhen a message arrives at an endpoint that has no "),a("code",[e._v("output-channel")]),e._v(", the "),a("code",[e._v("routingSlip")]),e._v(" is consulted to determine the next channel to which the message is sent.\nWhen the routing slip is exhausted, normal "),a("code",[e._v("replyChannel")]),e._v(" processing resumes.")]),e._v(" "),a("p",[e._v("Configuration for the routing slip is presented as a "),a("code",[e._v("HeaderEnricher")]),e._v(" option — a semicolon-separated routing slip that contains "),a("code",[e._v("path")]),e._v(" entries, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n channel1\n request.headers[myRoutingSlipChannel]\n\n\n\n\n\n \n\n')])])]),a("p",[e._v("The preceding example has:")]),e._v(" "),a("ul",[a("li",[a("p",[e._v("A "),a("code",[e._v("")]),e._v(" configuration to demonstrate that the entries in the routing slip "),a("code",[e._v("path")]),e._v(" can be specified as resolvable keys.")])]),e._v(" "),a("li",[a("p",[e._v("The "),a("code",[e._v("")]),e._v(" "),a("code",[e._v("")]),e._v(" sub-element is used to populate the "),a("code",[e._v("RoutingSlipHeaderValueMessageProcessor")]),e._v(" to the "),a("code",[e._v("HeaderEnricher")]),e._v(" handler.")])]),e._v(" "),a("li",[a("p",[e._v("The "),a("code",[e._v("RoutingSlipHeaderValueMessageProcessor")]),e._v(" accepts a "),a("code",[e._v("String")]),e._v(" array of resolved routing slip "),a("code",[e._v("path")]),e._v(" entries and returns (from "),a("code",[e._v("processMessage()")]),e._v(") a "),a("code",[e._v("singletonMap")]),e._v(" with the "),a("code",[e._v("path")]),e._v(" as "),a("code",[e._v("key")]),e._v(" and "),a("code",[e._v("0")]),e._v(" as initial "),a("code",[e._v("routingSlipIndex")]),e._v(".")])])]),e._v(" "),a("p",[e._v("Routing Slip "),a("code",[e._v("path")]),e._v(" entries can contain "),a("code",[e._v("MessageChannel")]),e._v(" bean names, "),a("code",[e._v("RoutingSlipRouteStrategy")]),e._v(" bean names, and Spring expressions (SpEL).\nThe "),a("code",[e._v("RoutingSlipHeaderValueMessageProcessor")]),e._v(" checks each routing slip "),a("code",[e._v("path")]),e._v(" entry against the "),a("code",[e._v("BeanFactory")]),e._v(" on the first "),a("code",[e._v("processMessage")]),e._v(" invocation.\nIt converts entries (which are not bean names in the application context) to "),a("code",[e._v("ExpressionEvaluatingRoutingSlipRouteStrategy")]),e._v(" instances."),a("code",[e._v("RoutingSlipRouteStrategy")]),e._v(" entries are invoked multiple times, until they return null or an empty "),a("code",[e._v("String")]),e._v(".")]),e._v(" "),a("p",[e._v("Since the routing slip is involved in the "),a("code",[e._v("getOutputChannel")]),e._v(" process, we have a request-reply context.\nThe "),a("code",[e._v("RoutingSlipRouteStrategy")]),e._v(" has been introduced to determine the next "),a("code",[e._v("outputChannel")]),e._v(" that uses the "),a("code",[e._v("requestMessage")]),e._v(" and the "),a("code",[e._v("reply")]),e._v(" object.\nAn implementation of this strategy should be registered as a bean in the application context, and its bean name is used in the routing slip "),a("code",[e._v("path")]),e._v(".\nThe "),a("code",[e._v("ExpressionEvaluatingRoutingSlipRouteStrategy")]),e._v(" implementation is provided.\nIt accepts a SpEL expression and an internal "),a("code",[e._v("ExpressionEvaluatingRoutingSlipRouteStrategy.RequestAndReply")]),e._v(" object is used as the root object of the evaluation context.\nThis is to avoid the overhead of "),a("code",[e._v("EvaluationContext")]),e._v(" creation for each "),a("code",[e._v("ExpressionEvaluatingRoutingSlipRouteStrategy.getNextPath()")]),e._v(" invocation.\nIt is a simple Java bean with two properties: "),a("code",[e._v("Message request")]),e._v(" and "),a("code",[e._v("Object reply")]),e._v(".\nWith this expression implementation, we can specify routing slip "),a("code",[e._v("path")]),e._v(" entries by using SpEL (for example, "),a("code",[e._v("@routingSlipRoutingPojo.get(request, reply)")]),e._v(" and "),a("code",[e._v("request.headers[myRoutingSlipChannel]")]),e._v(") and avoid defining a bean for the "),a("code",[e._v("RoutingSlipRouteStrategy")]),e._v(".")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("The "),a("code",[e._v("requestMessage")]),e._v(" argument is always a "),a("code",[e._v("Message")]),e._v("."),a("br"),e._v("Depending on context, the reply object may be a "),a("code",[e._v("Message")]),e._v(", an "),a("code",[e._v("AbstractIntegrationMessageBuilder")]),e._v(", or an arbitrary application domain object (when, for example, it is returned by a POJO method invoked by a service activator)."),a("br"),e._v("In the first two cases, the usual "),a("code",[e._v("Message")]),e._v(" properties ("),a("code",[e._v("payload")]),e._v(" and "),a("code",[e._v("headers")]),e._v(") are available when using SpEL (or a Java implementation)."),a("br"),e._v("For an arbitrary domain object, these properties are not available."),a("br"),e._v("For this reason, be careful when you use routing slips in conjunction with POJO methods if the result is used to determine the next path.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("If a routing slip is involved in a distributed environment, we recommend not using inline expressions for the Routing Slip "),a("code",[e._v("path")]),e._v("."),a("br"),e._v("This recommendation applies to distributed environments such as cross-JVM applications, using a "),a("code",[e._v("request-reply")]),e._v(" through a message broker (such as"),a("RouterLink",{attrs:{to:"/en/spring-integration/amqp.html#amqp"}},[e._v("AMQP Support")]),e._v(" or "),a("RouterLink",{attrs:{to:"/en/spring-integration/jms.html#jms"}},[e._v("JMS Support")]),e._v("), or using a persistent "),a("code",[e._v("MessageStore")]),e._v(" ("),a("RouterLink",{attrs:{to:"/en/spring-integration/message-store.html#message-store"}},[e._v("Message Store")]),e._v(") in the integration flow."),a("br"),e._v("The framework uses "),a("code",[e._v("RoutingSlipHeaderValueMessageProcessor")]),e._v(" to convert them to "),a("code",[e._v("ExpressionEvaluatingRoutingSlipRouteStrategy")]),e._v(" objects, and they are used in the "),a("code",[e._v("routingSlip")]),e._v(" message header."),a("br"),e._v("Since this class is not "),a("code",[e._v("Serializable")]),e._v(" (it cannot be, because it depends on the "),a("code",[e._v("BeanFactory")]),e._v("), the entire "),a("code",[e._v("Message")]),e._v(" becomes non-serializable and, in any distributed operation, we end up with a "),a("code",[e._v("NotSerializableException")]),e._v("."),a("br"),e._v("To overcome this limitation, register an "),a("code",[e._v("ExpressionEvaluatingRoutingSlipRouteStrategy")]),e._v(" bean with the desired SpEL and use its bean name in the routing slip "),a("code",[e._v("path")]),e._v(" configuration.")],1)])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("For Java configuration, you can add a "),a("code",[e._v("RoutingSlipHeaderValueMessageProcessor")]),e._v(" instance to the "),a("code",[e._v("HeaderEnricher")]),e._v(" bean definition, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Bean\n@Transformer(inputChannel = "routingSlipHeaderChannel")\npublic HeaderEnricher headerEnricher() {\n return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP,\n new RoutingSlipHeaderValueMessageProcessor("myRoutePath1",\n "@routingSlipRoutingPojo.get(request, reply)",\n "routingSlipRoutingStrategy",\n "request.headers[myRoutingSlipChannel]",\n "finishChannel")));\n}\n')])])]),a("p",[e._v("The routing slip algorithm works as follows when an endpoint produces a reply and no "),a("code",[e._v("outputChannel")]),e._v(" has been defined:")]),e._v(" "),a("ul",[a("li",[a("p",[e._v("The "),a("code",[e._v("routingSlipIndex")]),e._v(" is used to get a value from the routing slip "),a("code",[e._v("path")]),e._v(" list.")])]),e._v(" "),a("li",[a("p",[e._v("If the value from "),a("code",[e._v("routingSlipIndex")]),e._v(" is "),a("code",[e._v("String")]),e._v(", it is used to get a bean from "),a("code",[e._v("BeanFactory")]),e._v(".")])]),e._v(" "),a("li",[a("p",[e._v("If a returned bean is an instance of "),a("code",[e._v("MessageChannel")]),e._v(", it is used as the next "),a("code",[e._v("outputChannel")]),e._v(" and the "),a("code",[e._v("routingSlipIndex")]),e._v(" is incremented in the reply message header (the routing slip "),a("code",[e._v("path")]),e._v(" entries remain unchanged).")])]),e._v(" "),a("li",[a("p",[e._v("If a returned bean is an instance of "),a("code",[e._v("RoutingSlipRouteStrategy")]),e._v(" and its "),a("code",[e._v("getNextPath")]),e._v(" does not return an empty "),a("code",[e._v("String")]),e._v(", that result is used as a bean name for the next "),a("code",[e._v("outputChannel")]),e._v(".\nThe "),a("code",[e._v("routingSlipIndex")]),e._v(" remains unchanged.")])]),e._v(" "),a("li",[a("p",[e._v("If "),a("code",[e._v("RoutingSlipRouteStrategy.getNextPath")]),e._v(" returns an empty "),a("code",[e._v("String")]),e._v(" or "),a("code",[e._v("null")]),e._v(", the "),a("code",[e._v("routingSlipIndex")]),e._v(" is incremented and the "),a("code",[e._v("getOutputChannelFromRoutingSlip")]),e._v(" is invoked recursively for the next Routing Slip "),a("code",[e._v("path")]),e._v(" item.")])]),e._v(" "),a("li",[a("p",[e._v("If the next routing slip "),a("code",[e._v("path")]),e._v(" entry is not a "),a("code",[e._v("String")]),e._v(", it must be an instance of "),a("code",[e._v("RoutingSlipRouteStrategy")]),e._v(".")])]),e._v(" "),a("li",[a("p",[e._v("When the "),a("code",[e._v("routingSlipIndex")]),e._v(" exceeds the size of the routing slip "),a("code",[e._v("path")]),e._v(" list, the algorithm moves to the default behavior for the standard "),a("code",[e._v("replyChannel")]),e._v(" header.")])])]),e._v(" "),a("h4",{attrs:{id:"process-manager-enterprise-integration-pattern"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#process-manager-enterprise-integration-pattern"}},[e._v("#")]),e._v(" Process Manager Enterprise Integration Pattern")]),e._v(" "),a("p",[e._v("Enterprise integration patterns include the "),a("a",{attrs:{href:"https://www.enterpriseintegrationpatterns.com/ProcessManager.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("process manager"),a("OutboundLink")],1),e._v(" pattern.\nYou can now easily implement this pattern by using custom process manager logic encapsulated in a "),a("code",[e._v("RoutingSlipRouteStrategy")]),e._v(" within the routing slip.\nIn addition to a bean name, the "),a("code",[e._v("RoutingSlipRouteStrategy")]),e._v(" can return any "),a("code",[e._v("MessageChannel")]),e._v(" object, and there is no requirement that this "),a("code",[e._v("MessageChannel")]),e._v(" instance be a bean in the application context.\nThis way, we can provide powerful dynamic routing logic when there is no way to predict which channel should be used.\nA "),a("code",[e._v("MessageChannel")]),e._v(" can be created within the "),a("code",[e._v("RoutingSlipRouteStrategy")]),e._v(" and returned.\nA "),a("code",[e._v("FixedSubscriberChannel")]),e._v(" with an associated "),a("code",[e._v("MessageHandler")]),e._v(" implementation is a good combination for such cases.\nFor example, you can route to a "),a("a",{attrs:{href:"https://projectreactor.io/docs/core/release/reference/#getting-started",target:"_blank",rel:"noopener noreferrer"}},[e._v("Reactive Streams"),a("OutboundLink")],1),e._v(", as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("@Bean\npublic PollableChannel resultsChannel() {\n return new QueueChannel();\n}\n@Bean\npublic RoutingSlipRouteStrategy routeStrategy() {\n return (requestMessage, reply) -> requestMessage.getPayload() instanceof String\n ? new FixedSubscriberChannel(m ->\n Mono.just((String) m.getPayload())\n .map(String::toUpperCase)\n .subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)))\n : new FixedSubscriberChannel(m ->\n Mono.just((Integer) m.getPayload())\n .map(v -> v * 2)\n .subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)));\n}\n")])])]),a("h3",{attrs:{id:"filter"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#filter"}},[e._v("#")]),e._v(" Filter")]),e._v(" "),a("p",[e._v("Message filters are used to decide whether a "),a("code",[e._v("Message")]),e._v(" should be passed along or dropped based on some criteria, such as a message header value or message content itself.\nTherefore, a message filter is similar to a router, except that, for each message received from the filter’s input channel, that same message may or may not be sent to the filter’s output channel.\nUnlike the router, it makes no decision regarding which message channel to send the message to but decides only whether to send the message at all.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("As we describe later in this section, the filter also supports a discard channel."),a("br"),e._v("In certain cases, it can play the role of a very simple router (or “switch”), based on a boolean condition.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("In Spring Integration, you can configure a message filter as a message endpoint that delegates to an implementation of the "),a("code",[e._v("MessageSelector")]),e._v(" interface.\nThat interface is itself quite simple, as the following listing shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public interface MessageSelector {\n\n boolean accept(Message message);\n\n}\n")])])]),a("p",[e._v("The "),a("code",[e._v("MessageFilter")]),e._v(" constructor accepts a selector instance, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("MessageFilter filter = new MessageFilter(someSelector);\n")])])]),a("p",[e._v("In combination with the namespace and SpEL, you can configure powerful filters with very little Java code.")]),e._v(" "),a("h4",{attrs:{id:"configuring-a-filter-with-xml"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#configuring-a-filter-with-xml"}},[e._v("#")]),e._v(" Configuring a Filter with XML")]),e._v(" "),a("p",[e._v("You can use the "),a("code",[e._v("")]),e._v(" element is used to create a message-selecting endpoint.\nIn addition to "),a("code",[e._v("input-channel")]),e._v(" and "),a("code",[e._v("output-channel")]),e._v(" attributes, it requires a "),a("code",[e._v("ref")]),e._v(" attribute.\nThe "),a("code",[e._v("ref")]),e._v(" can point to a "),a("code",[e._v("MessageSelector")]),e._v(" implementation, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n\n\n')])])]),a("p",[e._v("Alternatively, you can add the "),a("code",[e._v("method")]),e._v(" attribute.\nIn that case, the "),a("code",[e._v("ref")]),e._v(" attribute may refer to any object.\nThe referenced method may expect either the "),a("code",[e._v("Message")]),e._v(" type or the payload type of inbound messages.\nThe method must return a boolean value.\nIf the method returns 'true', the message is sent to the output channel.\nThe following example shows how to configure a filter that uses the "),a("code",[e._v("method")]),e._v(" attribute:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n\n\n')])])]),a("p",[e._v("If the selector or adapted POJO method returns "),a("code",[e._v("false")]),e._v(", a few settings control the handling of the rejected message.\nBy default (if configured as in the preceding example), rejected messages are silently dropped.\nIf rejection should instead result in an error condition, set the "),a("code",[e._v("throw-exception-on-rejection")]),e._v(" attribute to "),a("code",[e._v("true")]),e._v(", as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n')])])]),a("p",[e._v("If you want rejected messages to be routed to a specific channel, provide that reference as the "),a("code",[e._v("discard-channel")]),e._v(", as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n')])])]),a("p",[e._v("See also "),a("RouterLink",{attrs:{to:"/en/spring-integration/handler-advice.html#advising-filters"}},[e._v("Advising Filters")]),e._v(".")],1),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Message filters are commonly used in conjunction with a publish-subscribe channel."),a("br"),e._v("Many filter endpoints may be subscribed to the same channel, and they decide whether or not to pass the message to the next endpoint, which could be any of the supported types (such as a service activator)."),a("br"),e._v("This provides a reactive alternative to the more proactive approach of using a message router with a single point-to-point input channel and multiple output channels.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("We recommend using a "),a("code",[e._v("ref")]),e._v(" attribute if the custom filter implementation is referenced in other "),a("code",[e._v("")]),e._v(" definitions.\nHowever, if the custom filter implementation is scoped to a single "),a("code",[e._v("")]),e._v(" element, you should provide an inner bean definition, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n\n')])])]),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Using both the "),a("code",[e._v("ref")]),e._v(" attribute and an inner handler definition in the same "),a("code",[e._v("")]),e._v(" configuration is not allowed, as it creates an ambiguous condition and throws an exception.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("If the "),a("code",[e._v("ref")]),e._v(" attribute references a bean that extends "),a("code",[e._v("MessageFilter")]),e._v(" (such as filters provided by the framework itself), the configuration is optimized by injecting the output channel into the filter bean directly."),a("br"),e._v("In this case, each "),a("code",[e._v("ref")]),e._v(" must be to a separate bean instance (or a "),a("code",[e._v("prototype")]),e._v("-scoped bean) or use the inner "),a("code",[e._v("")]),e._v(" configuration type."),a("br"),e._v("However, this optimization applies only if you do not provide any filter-specific attributes in the filter XML definition."),a("br"),e._v("If you inadvertently reference the same message handler from multiple beans, you get a configuration exception.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("With the introduction of SpEL support, Spring Integration added the "),a("code",[e._v("expression")]),e._v(" attribute to the filter element.\nIt can be used to avoid Java entirely for simple filters, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n')])])]),a("p",[e._v("The string passed as the value of the expression attribute is evaluated as a SpEL expression with the message available in the evaluation context.\nIf you must include the result of an expression in the scope of the application context, you can use the "),a("code",[e._v("#{}")]),e._v(" notation, as defined in the "),a("a",{attrs:{href:"https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#expressions-beandef",target:"_blank",rel:"noopener noreferrer"}},[e._v("SpEL reference documentation"),a("OutboundLink")],1),e._v(", as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n')])])]),a("p",[e._v("If the expression itself needs to be dynamic, you can use an 'expression' sub-element.\nThat provides a level of indirection for resolving the expression by its key from an "),a("code",[e._v("ExpressionSource")]),e._v(".\nThat is a strategy interface that you can implement directly, or you can rely upon a version available in Spring Integration that loads expressions from a “resource bundle” and can check for modifications after a given number of seconds.\nAll of this is demonstrated in the following configuration example, where the expression could be reloaded within one minute if the underlying file had been modified:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n\n\n\n \n \n\n')])])]),a("p",[e._v("If the "),a("code",[e._v("ExpressionSource")]),e._v(" bean is named "),a("code",[e._v("expressionSource")]),e._v(", you need not provide the"),a("code",[e._v("source")]),e._v(" attribute on the "),a("code",[e._v("")]),e._v(" element.\nHowever, in the preceding example, we show it for completeness.")]),e._v(" "),a("p",[e._v("The 'config/integration/expressions.properties' file (or any more-specific version with a locale extension to be resolved in the typical way that resource-bundles are loaded) can contain a key/value pair, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("filterPatterns.example=payload > 100\n")])])]),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("All of these examples that use "),a("code",[e._v("expression")]),e._v(" as an attribute or sub-element can also be applied within transformer, router, splitter, service-activator, and header-enricher elements."),a("br"),e._v("The semantics and role of the given component type would affect the interpretation of the evaluation result, in the same way that the return value of a method-invocation would be interpreted."),a("br"),e._v("For example, an expression can return strings that are to be treated as message channel names by a router component."),a("br"),e._v("However, the underlying functionality of evaluating the expression against the message as the root object and resolving bean names if prefixed with '@' is consistent across all of the core EIP components within Spring Integration.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h4",{attrs:{id:"configuring-a-filter-with-annotations"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#configuring-a-filter-with-annotations"}},[e._v("#")]),e._v(" Configuring a Filter with Annotations")]),e._v(" "),a("p",[e._v("The following example shows how to configure a filter by using annotations:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public class PetFilter {\n ...\n @Filter (1)\n public boolean dogsOnly(String input) {\n ...\n }\n}\n")])])]),a("table",[a("thead",[a("tr",[a("th",[a("strong",[e._v("1")])]),e._v(" "),a("th",[e._v("An annotation indicating that this method is to be used as a filter."),a("br"),e._v("It must be specified if this class is to be used as a filter.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("All of the configuration options provided by the XML element are also available for the "),a("code",[e._v("@Filter")]),e._v(" annotation.")]),e._v(" "),a("p",[e._v("The filter can be either referenced explicitly from XML or, if the "),a("code",[e._v("@MessageEndpoint")]),e._v(" annotation is defined on the class, detected automatically through classpath scanning.")]),e._v(" "),a("p",[e._v("See also "),a("RouterLink",{attrs:{to:"/en/spring-integration/handler-advice.html#advising-with-annotations"}},[e._v("Advising Endpoints Using Annotations")]),e._v(".")],1),e._v(" "),a("h3",{attrs:{id:"splitter"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#splitter"}},[e._v("#")]),e._v(" Splitter")]),e._v(" "),a("p",[e._v("The splitter is a component whose role is to partition a message into several parts and send the resulting messages to be processed independently.\nVery often, they are upstream producers in a pipeline that includes an aggregator.")]),e._v(" "),a("h4",{attrs:{id:"programming-model"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#programming-model"}},[e._v("#")]),e._v(" Programming Model")]),e._v(" "),a("p",[e._v("The API for performing splitting consists of one base class, "),a("code",[e._v("AbstractMessageSplitter")]),e._v(".\nIt is a "),a("code",[e._v("MessageHandler")]),e._v(" implementation that encapsulates features common to splitters, such as filling in the appropriate message headers ("),a("code",[e._v("CORRELATION_ID")]),e._v(", "),a("code",[e._v("SEQUENCE_SIZE")]),e._v(", and "),a("code",[e._v("SEQUENCE_NUMBER")]),e._v(") on the messages that are produced.\nThis filling enables tracking down the messages and the results of their processing (in a typical scenario, these headers get copied to the messages that are produced by the various transforming endpoints).\nThe values can then be used, for example, by a "),a("a",{attrs:{href:"https://www.enterpriseintegrationpatterns.com/DistributionAggregate.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("composed message processor"),a("OutboundLink")],1),e._v(".")]),e._v(" "),a("p",[e._v("The following example shows an excerpt from "),a("code",[e._v("AbstractMessageSplitter")]),e._v(":")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public abstract class AbstractMessageSplitter\n extends AbstractReplyProducingMessageConsumer {\n ...\n protected abstract Object splitMessage(Message message);\n\n}\n")])])]),a("p",[e._v("To implement a specific splitter in an application, you can extend "),a("code",[e._v("AbstractMessageSplitter")]),e._v(" and implement the "),a("code",[e._v("splitMessage")]),e._v(" method, which contains logic for splitting the messages.\nThe return value can be one of the following:")]),e._v(" "),a("ul",[a("li",[a("p",[e._v("A "),a("code",[e._v("Collection")]),e._v(" or an array of messages or an "),a("code",[e._v("Iterable")]),e._v(" (or "),a("code",[e._v("Iterator")]),e._v(") that iterates over messages.\nIn this case, the messages are sent as messages (after the "),a("code",[e._v("CORRELATION_ID")]),e._v(", "),a("code",[e._v("SEQUENCE_SIZE")]),e._v(" and "),a("code",[e._v("SEQUENCE_NUMBER")]),e._v(" are populated).\nUsing this approach gives you more control — for example, to populate custom message headers as part of the splitting process.")])]),e._v(" "),a("li",[a("p",[e._v("A "),a("code",[e._v("Collection")]),e._v(" or an array of non-message objects or an "),a("code",[e._v("Iterable")]),e._v(" (or "),a("code",[e._v("Iterator")]),e._v(") that iterates over non-message objects.\nIt works like the prior case, except that each collection element is used as a message payload.\nUsing this approach lets you focus on the domain objects without having to consider the messaging system and produces code that is easier to test.")])]),e._v(" "),a("li",[a("p",[e._v("a "),a("code",[e._v("Message")]),e._v(" or non-message object (but not a collection or an array).\nIt works like the previous cases, except that a single message is sent out.")])])]),e._v(" "),a("p",[e._v("In Spring Integration, any POJO can implement the splitting algorithm, provided that it defines a method that accepts a single argument and has a return value.\nIn this case, the return value of the method is interpreted as described earlier.\nThe input argument might either be a "),a("code",[e._v("Message")]),e._v(" or a simple POJO.\nIn the latter case, the splitter receives the payload of the incoming message.\nWe recommend this approach, because it decouples the code from the Spring Integration API and is typically easier to test.")]),e._v(" "),a("h5",{attrs:{id:"iterators"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#iterators"}},[e._v("#")]),e._v(" Iterators")]),e._v(" "),a("p",[e._v("Starting with version 4.1, the "),a("code",[e._v("AbstractMessageSplitter")]),e._v(" supports the "),a("code",[e._v("Iterator")]),e._v(" type for the "),a("code",[e._v("value")]),e._v(" to split.\nNote, in the case of an "),a("code",[e._v("Iterator")]),e._v(" (or "),a("code",[e._v("Iterable")]),e._v("), we don’t have access to the number of underlying items and the "),a("code",[e._v("SEQUENCE_SIZE")]),e._v(" header is set to "),a("code",[e._v("0")]),e._v(".\nThis means that the default "),a("code",[e._v("SequenceSizeReleaseStrategy")]),e._v(" of an "),a("code",[e._v("")]),e._v(" won’t work and the group for the "),a("code",[e._v("CORRELATION_ID")]),e._v(" from the "),a("code",[e._v("splitter")]),e._v(" won’t be released; it will remain as "),a("code",[e._v("incomplete")]),e._v(".\nIn this case you should use an appropriate custom "),a("code",[e._v("ReleaseStrategy")]),e._v(" or rely on "),a("code",[e._v("send-partial-result-on-expiry")]),e._v(" together with "),a("code",[e._v("group-timeout")]),e._v(" or a "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(".")]),e._v(" "),a("p",[e._v("Starting with version 5.0, the "),a("code",[e._v("AbstractMessageSplitter")]),e._v(" provides "),a("code",[e._v("protected obtainSizeIfPossible()")]),e._v(" methods to allow the determination of the size of the "),a("code",[e._v("Iterable")]),e._v(" and "),a("code",[e._v("Iterator")]),e._v(" objects if that is possible.\nFor example "),a("code",[e._v("XPathMessageSplitter")]),e._v(" can determine the size of the underlying "),a("code",[e._v("NodeList")]),e._v(" object.\nAnd starting with version 5.0.9, this method also properly returns a size of the "),a("code",[e._v("com.fasterxml.jackson.core.TreeNode")]),e._v(".")]),e._v(" "),a("p",[e._v("An "),a("code",[e._v("Iterator")]),e._v(" object is useful to avoid the need for building an entire collection in the memory before splitting.\nFor example, when underlying items are populated from some external system (e.g. DataBase or FTP "),a("code",[e._v("MGET")]),e._v(") using iterations or streams.")]),e._v(" "),a("h5",{attrs:{id:"stream-and-flux"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#stream-and-flux"}},[e._v("#")]),e._v(" Stream and Flux")]),e._v(" "),a("p",[e._v("Starting with version 5.0, the "),a("code",[e._v("AbstractMessageSplitter")]),e._v(" supports the Java "),a("code",[e._v("Stream")]),e._v(" and Reactive Streams "),a("code",[e._v("Publisher")]),e._v(" types for the "),a("code",[e._v("value")]),e._v(" to split.\nIn this case, the target "),a("code",[e._v("Iterator")]),e._v(" is built on their iteration functionality.")]),e._v(" "),a("p",[e._v("In addition, if the splitter’s output channel is an instance of a "),a("code",[e._v("ReactiveStreamsSubscribableChannel")]),e._v(", the "),a("code",[e._v("AbstractMessageSplitter")]),e._v(" produces a "),a("code",[e._v("Flux")]),e._v(" result instead of an "),a("code",[e._v("Iterator")]),e._v(", and the output channel is subscribed to this "),a("code",[e._v("Flux")]),e._v(" for back-pressure-based splitting on downstream flow demand.")]),e._v(" "),a("p",[e._v("Starting with version 5.2, the splitter supports a "),a("code",[e._v("discardChannel")]),e._v(" option for sending those request messages for which a split function has returned an empty container (collection, array, stream, "),a("code",[e._v("Flux")]),e._v(" etc.).\nIn this case there is just no item to iterate for sending to the "),a("code",[e._v("outputChannel")]),e._v(".\nThe "),a("code",[e._v("null")]),e._v(" splitting result remains as an end of flow indicator.")]),e._v(" "),a("h4",{attrs:{id:"configuring-a-splitter-with-xml"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#configuring-a-splitter-with-xml"}},[e._v("#")]),e._v(" Configuring a Splitter with XML")]),e._v(" "),a("p",[e._v("A splitter can be configured through XML as follows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n\n (6)\n\n\n\n\n')])])]),a("table",[a("thead",[a("tr",[a("th",[a("strong",[e._v("1")])]),e._v(" "),a("th",[e._v("The ID of the splitter is optional.")])])]),e._v(" "),a("tbody",[a("tr",[a("td",[a("strong",[e._v("2")])]),e._v(" "),a("td",[e._v("A reference to a bean defined in the application context."),a("br"),e._v("The bean must implement the splitting logic, as described in the earlier section."),a("br"),e._v("Optional."),a("br"),e._v("If a reference to a bean is not provided, it is assumed that the payload of the message that arrived on the "),a("code",[e._v("input-channel")]),e._v(" is an implementation of "),a("code",[e._v("java.util.Collection")]),e._v(" and the default splitting logic is applied to the collection, incorporating each individual element into a message and sending it to the "),a("code",[e._v("output-channel")]),e._v(".")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("3")])]),e._v(" "),a("td",[e._v("The method (defined on the bean) that implements the splitting logic."),a("br"),e._v("Optional.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("4")])]),e._v(" "),a("td",[e._v("The input channel of the splitter."),a("br"),e._v("Required.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("5")])]),e._v(" "),a("td",[e._v("The channel to which the splitter sends the results of splitting the incoming message."),a("br"),e._v("Optional (because incoming messages can specify a reply channel themselves).")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("6")])]),e._v(" "),a("td",[e._v("The channel to which the request message is sent in case of empty splitting result."),a("br"),e._v("Optional (the will stop as in case of "),a("code",[e._v("null")]),e._v(" result).")])])])]),e._v(" "),a("p",[e._v("We recommend using a "),a("code",[e._v("ref")]),e._v(" attribute if the custom splitter implementation can be referenced in other "),a("code",[e._v("")]),e._v(" definitions.\nHowever if the custom splitter handler implementation should be scoped to a single definition of the "),a("code",[e._v("")]),e._v(", you can configure an inner bean definition, as the following example follows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n\n')])])]),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Using both a "),a("code",[e._v("ref")]),e._v(" attribute and an inner handler definition in the same "),a("code",[e._v("")]),e._v(" configuration is not allowed, as it creates an ambiguous condition and results in an exception being thrown.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("If the "),a("code",[e._v("ref")]),e._v(" attribute references a bean that extends "),a("code",[e._v("AbstractMessageProducingHandler")]),e._v(" (such as splitters provided by the framework itself), the configuration is optimized by injecting the output channel into the handler directly."),a("br"),e._v("In this case, each "),a("code",[e._v("ref")]),e._v(" must be a separate bean instance (or a "),a("code",[e._v("prototype")]),e._v("-scoped bean) or use the inner "),a("code",[e._v("")]),e._v(" configuration type."),a("br"),e._v("However, this optimization applies only if you do not provide any splitter-specific attributes in the splitter XML definition."),a("br"),e._v("If you inadvertently reference the same message handler from multiple beans, you get a configuration exception.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h4",{attrs:{id:"configuring-a-splitter-with-annotations"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#configuring-a-splitter-with-annotations"}},[e._v("#")]),e._v(" Configuring a Splitter with Annotations")]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("@Splitter")]),e._v(" annotation is applicable to methods that expect either the "),a("code",[e._v("Message")]),e._v(" type or the message payload type, and the return values of the method should be a "),a("code",[e._v("Collection")]),e._v(" of any type.\nIf the returned values are not actual "),a("code",[e._v("Message")]),e._v(" objects, each item is wrapped in a "),a("code",[e._v("Message")]),e._v(" as the payload of the "),a("code",[e._v("Message")]),e._v(".\nEach resulting "),a("code",[e._v("Message")]),e._v(" is sent to the designated output channel for the endpoint on which the "),a("code",[e._v("@Splitter")]),e._v(" is defined.")]),e._v(" "),a("p",[e._v("The following example shows how to configure a splitter by using the "),a("code",[e._v("@Splitter")]),e._v(" annotation:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("@Splitter\nList extractItems(Order order) {\n return order.getItems()\n}\n")])])]),a("p",[e._v("See also "),a("RouterLink",{attrs:{to:"/en/spring-integration/handler-advice.html#advising-with-annotations"}},[e._v("Advising Endpoints Using Annotations")]),e._v(", "),a("RouterLink",{attrs:{to:"/en/spring-integration/dsl.html#java-dsl-splitters"}},[e._v("Splitters")]),e._v(" and "),a("RouterLink",{attrs:{to:"/en/spring-integration/file.html#file-splitter"}},[e._v("File Splitter")]),e._v(".")],1),e._v(" "),a("h3",{attrs:{id:"aggregator"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#aggregator"}},[e._v("#")]),e._v(" Aggregator")]),e._v(" "),a("p",[e._v("Basically a mirror-image of the splitter, the aggregator is a type of message handler that receives multiple messages and combines them into a single message.\nIn fact, an aggregator is often a downstream consumer in a pipeline that includes a splitter.")]),e._v(" "),a("p",[e._v("Technically, the aggregator is more complex than a splitter, because it is stateful.\nIt must hold the messages to be aggregated and determine when the complete group of messages is ready to be aggregated.\nIn order to do so, it requires a "),a("code",[e._v("MessageStore")]),e._v(".")]),e._v(" "),a("h4",{attrs:{id:"functionality"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#functionality"}},[e._v("#")]),e._v(" Functionality")]),e._v(" "),a("p",[e._v("The Aggregator combines a group of related messages, by correlating and storing them, until the group is deemed to be complete.\nAt that point, the aggregator creates a single message by processing the whole group and sends the aggregated message as output.")]),e._v(" "),a("p",[e._v("Implementing an aggregator requires providing the logic to perform the aggregation (that is, the creation of a single message from many).\nTwo related concepts are correlation and release.")]),e._v(" "),a("p",[e._v("Correlation determines how messages are grouped for aggregation.\nIn Spring Integration, correlation is done by default, based on the "),a("code",[e._v("IntegrationMessageHeaderAccessor.CORRELATION_ID")]),e._v(" message header.\nMessages with the same "),a("code",[e._v("IntegrationMessageHeaderAccessor.CORRELATION_ID")]),e._v(" are grouped together.\nHowever, you can customize the correlation strategy to allow other ways of specifying how the messages should be grouped together.\nTo do so, you can implement a "),a("code",[e._v("CorrelationStrategy")]),e._v(" (covered later in this chapter).")]),e._v(" "),a("p",[e._v("To determine the point at which a group of messages is ready to be processed, a "),a("code",[e._v("ReleaseStrategy")]),e._v(" is consulted.\nThe default release strategy for the aggregator releases a group when all messages included in a sequence are present, based on the "),a("code",[e._v("IntegrationMessageHeaderAccessor.SEQUENCE_SIZE")]),e._v(" header.\nYou can override this default strategy by providing a reference to a custom "),a("code",[e._v("ReleaseStrategy")]),e._v(" implementation.")]),e._v(" "),a("h4",{attrs:{id:"programming-model-2"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#programming-model-2"}},[e._v("#")]),e._v(" Programming Model")]),e._v(" "),a("p",[e._v("The Aggregation API consists of a number of classes:")]),e._v(" "),a("ul",[a("li",[a("p",[e._v("The interface "),a("code",[e._v("MessageGroupProcessor")]),e._v(", and its subclasses: "),a("code",[e._v("MethodInvokingAggregatingMessageGroupProcessor")]),e._v(" and "),a("code",[e._v("ExpressionEvaluatingMessageGroupProcessor")])])]),e._v(" "),a("li",[a("p",[e._v("The "),a("code",[e._v("ReleaseStrategy")]),e._v(" interface and its default implementation: "),a("code",[e._v("SimpleSequenceSizeReleaseStrategy")])])]),e._v(" "),a("li",[a("p",[e._v("The "),a("code",[e._v("CorrelationStrategy")]),e._v(" interface and its default implementation: "),a("code",[e._v("HeaderAttributeCorrelationStrategy")])])])]),e._v(" "),a("h5",{attrs:{id:"aggregatingmessagehandler"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#aggregatingmessagehandler"}},[e._v("#")]),e._v(" "),a("code",[e._v("AggregatingMessageHandler")])]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("AggregatingMessageHandler")]),e._v(" (a subclass of "),a("code",[e._v("AbstractCorrelatingMessageHandler")]),e._v(") is a "),a("code",[e._v("MessageHandler")]),e._v(" implementation, encapsulating the common functionality of an aggregator (and other correlating use cases), which are as follows:")]),e._v(" "),a("ul",[a("li",[a("p",[e._v("Correlating messages into a group to be aggregated")])]),e._v(" "),a("li",[a("p",[e._v("Maintaining those messages in a "),a("code",[e._v("MessageStore")]),e._v(" until the group can be released")])]),e._v(" "),a("li",[a("p",[e._v("Deciding when the group can be released")])]),e._v(" "),a("li",[a("p",[e._v("Aggregating the released group into a single message")])]),e._v(" "),a("li",[a("p",[e._v("Recognizing and responding to an expired group")])])]),e._v(" "),a("p",[e._v("The responsibility for deciding how the messages should be grouped together is delegated to a "),a("code",[e._v("CorrelationStrategy")]),e._v(" instance.\nThe responsibility for deciding whether the message group can be released is delegated to a "),a("code",[e._v("ReleaseStrategy")]),e._v(" instance.")]),e._v(" "),a("p",[e._v("The following listing shows a brief highlight of the base "),a("code",[e._v("AbstractAggregatingMessageGroupProcessor")]),e._v(" (the responsibility for implementing the "),a("code",[e._v("aggregatePayloads")]),e._v(" method is left to the developer):")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public abstract class AbstractAggregatingMessageGroupProcessor\n implements MessageGroupProcessor {\n\n protected Map aggregateHeaders(MessageGroup group) {\n // default implementation exists\n }\n\n protected abstract Object aggregatePayloads(MessageGroup group, Map defaultHeaders);\n\n}\n")])])]),a("p",[e._v("See "),a("code",[e._v("DefaultAggregatingMessageGroupProcessor")]),e._v(", "),a("code",[e._v("ExpressionEvaluatingMessageGroupProcessor")]),e._v(" and "),a("code",[e._v("MethodInvokingMessageGroupProcessor")]),e._v(" as out-of-the-box implementations of the "),a("code",[e._v("AbstractAggregatingMessageGroupProcessor")]),e._v(".")]),e._v(" "),a("p",[e._v("Starting with version 5.2, a "),a("code",[e._v("Function>")]),e._v(" strategy is available for the "),a("code",[e._v("AbstractAggregatingMessageGroupProcessor")]),e._v(" to merge and compute (aggregate) headers for an output message.\nThe "),a("code",[e._v("DefaultAggregateHeadersFunction")]),e._v(" implementation is available with logic that returns all headers that have no conflicts among the group; an absent header on one or more messages within the group is not considered a conflict.\nConflicting headers are omitted.\nAlong with the newly introduced "),a("code",[e._v("DelegatingMessageGroupProcessor")]),e._v(", this function is used for any arbitrary (non-"),a("code",[e._v("AbstractAggregatingMessageGroupProcessor")]),e._v(") "),a("code",[e._v("MessageGroupProcessor")]),e._v(" implementation.\nEssentially, the framework injects a provided function into an "),a("code",[e._v("AbstractAggregatingMessageGroupProcessor")]),e._v(" instance and wraps all other implementations into a "),a("code",[e._v("DelegatingMessageGroupProcessor")]),e._v(".\nThe difference in logic between the "),a("code",[e._v("AbstractAggregatingMessageGroupProcessor")]),e._v(" and the "),a("code",[e._v("DelegatingMessageGroupProcessor")]),e._v(" that the latter doesn’t compute headers in advance, before calling the delegate strategy, and doesn’t invoke the function if the delegate returns a "),a("code",[e._v("Message")]),e._v(" or "),a("code",[e._v("AbstractIntegrationMessageBuilder")]),e._v(".\nIn that case, the framework assumes that the target implementation has taken care of producing a proper set of headers populated into the returned result.\nThe "),a("code",[e._v("Function>")]),e._v(" strategy is available as the "),a("code",[e._v("headers-function")]),e._v(" reference attribute for XML configuration, as the "),a("code",[e._v("AggregatorSpec.headersFunction()")]),e._v(" option for the Java DSL and as "),a("code",[e._v("AggregatorFactoryBean.setHeadersFunction()")]),e._v(" for plain Java configuration.")]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("CorrelationStrategy")]),e._v(" is owned by the "),a("code",[e._v("AbstractCorrelatingMessageHandler")]),e._v(" and has a default value based on the "),a("code",[e._v("IntegrationMessageHeaderAccessor.CORRELATION_ID")]),e._v(" message header, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,\n CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {\n ...\n this.correlationStrategy = correlationStrategy == null ?\n new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;\n this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;\n ...\n}\n")])])]),a("p",[e._v("As for the actual processing of the message group, the default implementation is the "),a("code",[e._v("DefaultAggregatingMessageGroupProcessor")]),e._v(".\nIt creates a single "),a("code",[e._v("Message")]),e._v(" whose payload is a "),a("code",[e._v("List")]),e._v(" of the payloads received for a given group.\nThis works well for simple scatter-gather implementations with a splitter, a publish-subscribe channel, or a recipient list router upstream.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("When using a publish-subscribe channel or a recipient list router in this type of scenario, be sure to enable the "),a("code",[e._v("apply-sequence")]),e._v(" flag."),a("br"),e._v("Doing so adds the necessary headers: "),a("code",[e._v("CORRELATION_ID")]),e._v(", "),a("code",[e._v("SEQUENCE_NUMBER")]),e._v(", and "),a("code",[e._v("SEQUENCE_SIZE")]),e._v("."),a("br"),e._v("That behavior is enabled by default for splitters in Spring Integration, but it is not enabled for publish-subscribe channels or for recipient list routers because those components may be used in a variety of contexts in which these headers are not necessary.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("When implementing a specific aggregator strategy for an application, you can extend "),a("code",[e._v("AbstractAggregatingMessageGroupProcessor")]),e._v(" and implement the "),a("code",[e._v("aggregatePayloads")]),e._v(" method.\nHowever, there are better solutions, less coupled to the API, for implementing the aggregation logic, which can be configured either through XML or through annotations.")]),e._v(" "),a("p",[e._v("In general, any POJO can implement the aggregation algorithm if it provides a method that accepts a single "),a("code",[e._v("java.util.List")]),e._v(" as an argument (parameterized lists are supported as well).\nThis method is invoked for aggregating messages as follows:")]),e._v(" "),a("ul",[a("li",[a("p",[e._v("If the argument is a "),a("code",[e._v("java.util.Collection")]),e._v(" and the parameter type T is assignable to "),a("code",[e._v("Message")]),e._v(", the whole list of messages accumulated for aggregation is sent to the aggregator.")])]),e._v(" "),a("li",[a("p",[e._v("If the argument is a non-parameterized "),a("code",[e._v("java.util.Collection")]),e._v(" or the parameter type is not assignable to "),a("code",[e._v("Message")]),e._v(", the method receives the payloads of the accumulated messages.")])]),e._v(" "),a("li",[a("p",[e._v("If the return type is not assignable to "),a("code",[e._v("Message")]),e._v(", it is treated as the payload for a "),a("code",[e._v("Message")]),e._v(" that is automatically created by the framework.")])])]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("In the interest of code simplicity and promoting best practices such as low coupling, testability, and others, the preferred way of implementing the aggregation logic is through a POJO and using the XML or annotation support for configuring it in the application.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("Starting with version 5.3, after processing message group, an "),a("code",[e._v("AbstractCorrelatingMessageHandler")]),e._v(" performs a "),a("code",[e._v("MessageBuilder.popSequenceDetails()")]),e._v(" message headers modification for the proper splitter-aggregator scenario with several nested levels.\nIt is done only if the message group release result is not a collection of messages.\nIn that case a target "),a("code",[e._v("MessageGroupProcessor")]),e._v(" is responsible for the "),a("code",[e._v("MessageBuilder.popSequenceDetails()")]),e._v(" call while building those messages.")]),e._v(" "),a("p",[e._v("If the "),a("code",[e._v("MessageGroupProcessor")]),e._v(" returns a "),a("code",[e._v("Message")]),e._v(", a "),a("code",[e._v("MessageBuilder.popSequenceDetails()")]),e._v(" will be performed on the output message only if the "),a("code",[e._v("sequenceDetails")]),e._v(" matches with first message in group.\n(Previously this has been done only if a plain payload or an "),a("code",[e._v("AbstractIntegrationMessageBuilder")]),e._v(" has been returned from the "),a("code",[e._v("MessageGroupProcessor")]),e._v(".)")]),e._v(" "),a("p",[e._v("This functionality can be controlled by a new "),a("code",[e._v("popSequence")]),e._v(" "),a("code",[e._v("boolean")]),e._v(" property, so the "),a("code",[e._v("MessageBuilder.popSequenceDetails()")]),e._v(" can be disabled in some scenarios when correlation details have not been populated by the standard splitter.\nThis property, essentially, undoes what has been done by the nearest upstream "),a("code",[e._v("applySequence = true")]),e._v(" in the "),a("code",[e._v("AbstractMessageSplitter")]),e._v(".\nSee "),a("RouterLink",{attrs:{to:"/en/spring-integration/splitter.html#splitter"}},[e._v("Splitter")]),e._v(" for more information.")],1),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("The "),a("code",[e._v("SimpleMessageGroup.getMessages()")]),e._v(" method returns an "),a("code",[e._v("unmodifiableCollection")]),e._v("."),a("br"),e._v("Therefore, if your aggregating POJO method has a "),a("code",[e._v("Collection")]),e._v(" parameter, the argument passed in is exactly that "),a("code",[e._v("Collection")]),e._v(" instance and, when you use a "),a("code",[e._v("SimpleMessageStore")]),e._v(" for the aggregator, that original "),a("code",[e._v("Collection")]),e._v(" is cleared after releasing the group."),a("br"),e._v("Consequently, the "),a("code",[e._v("Collection")]),e._v(" variable in the POJO is cleared too, if it is passed out of the aggregator."),a("br"),e._v("If you wish to simply release that collection as-is for further processing, you must build a new "),a("code",[e._v("Collection")]),e._v(" (for example, "),a("code",[e._v("new ArrayList(messages)")]),e._v(")."),a("br"),e._v("Starting with version 4.3, the framework no longer copies the messages to a new collection, to avoid undesired extra object creation.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("If the "),a("code",[e._v("processMessageGroup")]),e._v(" method of the "),a("code",[e._v("MessageGroupProcessor")]),e._v(" returns a collection, it must be a collection of "),a("code",[e._v("Message")]),e._v(" objects.\nIn this case, the messages are individually released.\nPrior to version 4.2, it was not possible to provide a "),a("code",[e._v("MessageGroupProcessor")]),e._v(" by using XML configuration.\nOnly POJO methods could be used for aggregation.\nNow, if the framework detects that the referenced (or inner) bean implements "),a("code",[e._v("MessageProcessor")]),e._v(", it is used as the aggregator’s output processor.")]),e._v(" "),a("p",[e._v("If you wish to release a collection of objects from a custom "),a("code",[e._v("MessageGroupProcessor")]),e._v(" as the payload of a message, your class should extend "),a("code",[e._v("AbstractAggregatingMessageGroupProcessor")]),e._v(" and implement "),a("code",[e._v("aggregatePayloads()")]),e._v(".")]),e._v(" "),a("p",[e._v("Also, since version 4.2, a "),a("code",[e._v("SimpleMessageGroupProcessor")]),e._v(" is provided.\nIt returns the collection of messages from the group, which, as indicated earlier, causes the released messages to be sent individually.")]),e._v(" "),a("p",[e._v("This lets the aggregator work as a message barrier, where arriving messages are held until the release strategy fires and the group is released as a sequence of individual messages.")]),e._v(" "),a("h5",{attrs:{id:"releasestrategy"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#releasestrategy"}},[e._v("#")]),e._v(" "),a("code",[e._v("ReleaseStrategy")])]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("ReleaseStrategy")]),e._v(" interface is defined as follows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public interface ReleaseStrategy {\n\n boolean canRelease(MessageGroup group);\n\n}\n")])])]),a("p",[e._v("In general, any POJO can implement the completion decision logic if it provides a method that accepts a single "),a("code",[e._v("java.util.List")]),e._v(" as an argument (parameterized lists are supported as well) and returns a boolean value.\nThis method is invoked after the arrival of each new message, to decide whether the group is complete or not, as follows:")]),e._v(" "),a("ul",[a("li",[a("p",[e._v("If the argument is a "),a("code",[e._v("java.util.List")]),e._v(" and the parameter type "),a("code",[e._v("T")]),e._v(" is assignable to "),a("code",[e._v("Message")]),e._v(", the whole list of messages accumulated in the group is sent to the method.")])]),e._v(" "),a("li",[a("p",[e._v("If the argument is a non-parametrized "),a("code",[e._v("java.util.List")]),e._v(" or the parameter type is not assignable to "),a("code",[e._v("Message")]),e._v(", the method receives the payloads of the accumulated messages.")])]),e._v(" "),a("li",[a("p",[e._v("The method must return "),a("code",[e._v("true")]),e._v(" if the message group is ready for aggregation or false otherwise.")])])]),e._v(" "),a("p",[e._v("The following example shows how to use the "),a("code",[e._v("@ReleaseStrategy")]),e._v(" annotation for a "),a("code",[e._v("List")]),e._v(" of type "),a("code",[e._v("Message")]),e._v(":")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public class MyReleaseStrategy {\n\n @ReleaseStrategy\n public boolean canMessagesBeReleased(List>) {...}\n}\n")])])]),a("p",[e._v("The following example shows how to use the "),a("code",[e._v("@ReleaseStrategy")]),e._v(" annotation for a "),a("code",[e._v("List")]),e._v(" of type "),a("code",[e._v("String")]),e._v(":")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public class MyReleaseStrategy {\n\n @ReleaseStrategy\n public boolean canMessagesBeReleased(List) {...}\n}\n")])])]),a("p",[e._v("Based on the signatures in the preceding two examples, the POJO-based release strategy is passed a "),a("code",[e._v("Collection")]),e._v(" of not-yet-released messages (if you need access to the whole "),a("code",[e._v("Message")]),e._v(") or a "),a("code",[e._v("Collection")]),e._v(" of payload objects (if the type parameter is anything other than "),a("code",[e._v("Message")]),e._v(").\nThis satisfies the majority of use cases.\nHowever if, for some reason, you need to access the full "),a("code",[e._v("MessageGroup")]),e._v(", you should provide an implementation of the "),a("code",[e._v("ReleaseStrategy")]),e._v(" interface.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("When handling potentially large groups, you should understand how these methods are invoked, because the release strategy may be invoked multiple times before the group is released."),a("br"),e._v("The most efficient is an implementation of "),a("code",[e._v("ReleaseStrategy")]),e._v(", because the aggregator can invoke it directly."),a("br"),e._v("The second most efficient is a POJO method with a "),a("code",[e._v("Collection>")]),e._v(" parameter type."),a("br"),e._v("The least efficient is a POJO method with a "),a("code",[e._v("Collection")]),e._v(" type."),a("br"),e._v("The framework has to copy the payloads from the messages in the group into a new collection (and possibly attempt conversion on the payloads to "),a("code",[e._v("Something")]),e._v(") every time the release strategy is called."),a("br"),e._v("Using "),a("code",[e._v("Collection")]),e._v(" avoids the conversion but still requires creating the new "),a("code",[e._v("Collection")]),e._v("."),a("br"),a("br"),e._v("For these reasons, for large groups, we recommended that you implement "),a("code",[e._v("ReleaseStrategy")]),e._v(".")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("When the group is released for aggregation, all its not-yet-released messages are processed and removed from the group.\nIf the group is also complete (that is, if all messages from a sequence have arrived or if there is no sequence defined), then the group is marked as complete.\nAny new messages for this group are sent to the discard channel (if defined).\nSetting "),a("code",[e._v("expire-groups-upon-completion")]),e._v(" to "),a("code",[e._v("true")]),e._v(" (the default is "),a("code",[e._v("false")]),e._v(") removes the entire group, and any new messages (with the same correlation ID as the removed group) form a new group.\nYou can release partial sequences by using a "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" together with "),a("code",[e._v("send-partial-result-on-expiry")]),e._v(" being set to "),a("code",[e._v("true")]),e._v(".")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("To facilitate discarding of late-arriving messages, the aggregator must maintain state about the group after it has been released."),a("br"),e._v("This can eventually cause out-of-memory conditions."),a("br"),e._v("To avoid such situations, you should consider configuring a "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" to remove the group metadata."),a("br"),e._v("The expiry parameters should be set to expire groups once a point has been reach after after which late messages are not expected to arrive."),a("br"),e._v("For information about configuring a reaper, see "),a("a",{attrs:{href:"#reaper"}},[e._v("Managing State in an Aggregator: "),a("code",[e._v("MessageGroupStore")])]),e._v(".")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("Spring Integration provides an implementation for "),a("code",[e._v("ReleaseStrategy")]),e._v(": "),a("code",[e._v("SimpleSequenceSizeReleaseStrategy")]),e._v(".\nThis implementation consults the "),a("code",[e._v("SEQUENCE_NUMBER")]),e._v(" and "),a("code",[e._v("SEQUENCE_SIZE")]),e._v(" headers of each arriving message to decide when a message group is complete and ready to be aggregated.\nAs shown earlier, it is also the default strategy.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Before version 5.0, the default release strategy was "),a("code",[e._v("SequenceSizeReleaseStrategy")]),e._v(", which does not perform well with large groups."),a("br"),e._v("With that strategy, duplicate sequence numbers are detected and rejected."),a("br"),e._v("This operation can be expensive.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("If you are aggregating large groups, you don’t need to release partial groups, and you don’t need to detect/reject duplicate sequences, consider using the "),a("code",[e._v("SimpleSequenceSizeReleaseStrategy")]),e._v(" instead - it is much more efficient for these use cases, and is the default since "),a("em",[e._v("version 5.0")]),e._v(" when partial group release is not specified.")]),e._v(" "),a("h5",{attrs:{id:"aggregating-large-groups"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#aggregating-large-groups"}},[e._v("#")]),e._v(" Aggregating Large Groups")]),e._v(" "),a("p",[e._v("The 4.3 release changed the default "),a("code",[e._v("Collection")]),e._v(" for messages in a "),a("code",[e._v("SimpleMessageGroup")]),e._v(" to "),a("code",[e._v("HashSet")]),e._v(" (it was previously a "),a("code",[e._v("BlockingQueue")]),e._v(").\nThis was expensive when removing individual messages from large groups (an O(n) linear scan was required).\nAlthough the hash set is generally much faster to remove, it can be expensive for large messages, because the hash has to be calculated on both inserts and removes.\nIf you have messages that are expensive to hash, consider using some other collection type.\nAs discussed in "),a("RouterLink",{attrs:{to:"/en/spring-integration/message-store.html#message-group-factory"}},[e._v("Using "),a("code",[e._v("MessageGroupFactory")])]),e._v(", a "),a("code",[e._v("SimpleMessageGroupFactory")]),e._v(" is provided so that you can select the "),a("code",[e._v("Collection")]),e._v(" that best suits your needs.\nYou can also provide your own factory implementation to create some other "),a("code",[e._v("Collection>")]),e._v(".")],1),e._v(" "),a("p",[e._v("The following example shows how to configure an aggregator with the previous implementation and a "),a("code",[e._v("SimpleSequenceSizeReleaseStrategy")]),e._v(":")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n\n\n \n \n \n \n \n\n\n\n')])])]),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("If the filter endpoint is involved in the flow upstream of an aggregator, the sequence size release strategy (fixed or based on the "),a("code",[e._v("sequenceSize")]),e._v(" header) is not going to serve its purpose because some messages from a sequence may be discarded by the filter."),a("br"),e._v("In this case it is recommended to choose another "),a("code",[e._v("ReleaseStrategy")]),e._v(", or use compensation messages sent from a discard sub-flow carrying some information in their content to be skipped in a custom complete group function."),a("br"),e._v("See "),a("RouterLink",{attrs:{to:"/en/spring-integration/filter.html#filter"}},[e._v("Filter")]),e._v(" for more information.")],1)])]),e._v(" "),a("tbody")]),e._v(" "),a("h5",{attrs:{id:"correlation-strategy"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#correlation-strategy"}},[e._v("#")]),e._v(" Correlation Strategy")]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("CorrelationStrategy")]),e._v(" interface is defined as follows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public interface CorrelationStrategy {\n\n Object getCorrelationKey(Message message);\n\n}\n")])])]),a("p",[e._v("The method returns an "),a("code",[e._v("Object")]),e._v(" that represents the correlation key used for associating the message with a message group.\nThe key must satisfy the criteria used for a key in a "),a("code",[e._v("Map")]),e._v(" with respect to the implementation of "),a("code",[e._v("equals()")]),e._v(" and "),a("code",[e._v("hashCode()")]),e._v(".")]),e._v(" "),a("p",[e._v("In general, any POJO can implement the correlation logic, and the rules for mapping a message to a method’s argument (or arguments) are the same as for a "),a("code",[e._v("ServiceActivator")]),e._v(" (including support for "),a("code",[e._v("@Header")]),e._v(" annotations).\nThe method must return a value, and the value must not be "),a("code",[e._v("null")]),e._v(".")]),e._v(" "),a("p",[e._v("Spring Integration provides an implementation for "),a("code",[e._v("CorrelationStrategy")]),e._v(": "),a("code",[e._v("HeaderAttributeCorrelationStrategy")]),e._v(".\nThis implementation returns the value of one of the message headers (whose name is specified by a constructor argument) as the correlation key.\nBy default, the correlation strategy is a "),a("code",[e._v("HeaderAttributeCorrelationStrategy")]),e._v(" that returns the value of the "),a("code",[e._v("CORRELATION_ID")]),e._v(" header attribute.\nIf you have a custom header name you would like to use for correlation, you can configure it on an instance of "),a("code",[e._v("HeaderAttributeCorrelationStrategy")]),e._v(" and provide that as a reference for the aggregator’s correlation strategy.")]),e._v(" "),a("h5",{attrs:{id:"lock-registry"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#lock-registry"}},[e._v("#")]),e._v(" Lock Registry")]),e._v(" "),a("p",[e._v("Changes to groups are thread safe.\nSo, when you send messages for the same correlation ID concurrently, only one of them will be processed in the aggregator, making it effectively as a "),a("strong",[e._v("single-threaded per message group")]),e._v(".\nA "),a("code",[e._v("LockRegistry")]),e._v(" is used to obtain a lock for the resolved correlation ID.\nA "),a("code",[e._v("DefaultLockRegistry")]),e._v(" is used by default (in-memory).\nFor synchronizing updates across servers where a shared "),a("code",[e._v("MessageGroupStore")]),e._v(" is being used, you must configure a shared lock registry.")]),e._v(" "),a("h5",{attrs:{id:"avoiding-deadlocks"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#avoiding-deadlocks"}},[e._v("#")]),e._v(" Avoiding Deadlocks")]),e._v(" "),a("p",[e._v("As discussed above, when message groups are mutated (messages added or released) a lock is held.")]),e._v(" "),a("p",[e._v("Consider the following flow:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("...->aggregator1-> ... ->aggregator2-> ...\n")])])]),a("p",[e._v("If there are multiple threads, "),a("strong",[e._v("and the aggregators share a common lock registry")]),e._v(", it is possible to get a deadlock.\nThis will cause hung threads and "),a("code",[e._v("jstack ")]),e._v(" might present a result such as:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('Found one Java-level deadlock:\n=============================\n"t2":\n waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),\n which is held by "t1"\n"t1":\n waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),\n which is held by "t2"\n')])])]),a("p",[e._v("There are several ways to avoid this problem:")]),e._v(" "),a("ul",[a("li",[a("p",[e._v("ensure each aggregator has its own lock registry (this can be a shared registry across application instances but two or more aggregators in the flow must each have a distinct registry)")])]),e._v(" "),a("li",[a("p",[e._v("use an "),a("code",[e._v("ExecutorChannel")]),e._v(" or "),a("code",[e._v("QueueChannel")]),e._v(" as the output channel of the aggregator so that the downstream flow runs on a new thread")])]),e._v(" "),a("li",[a("p",[e._v("starting with version 5.1.1, set the "),a("code",[e._v("releaseLockBeforeSend")]),e._v(" aggregator property to "),a("code",[e._v("true")])])])]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("This problem can also be caused if, for some reason, the output of a single aggregator is eventually routed back to the same aggregator."),a("br"),e._v("Of course, the first solution above does not apply in this case.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h4",{attrs:{id:"configuring-an-aggregator-in-java-dsl"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#configuring-an-aggregator-in-java-dsl"}},[e._v("#")]),e._v(" Configuring an Aggregator in Java DSL")]),e._v(" "),a("p",[e._v("See "),a("RouterLink",{attrs:{to:"/en/spring-integration/dsl.html#java-dsl-aggregators"}},[e._v("Aggregators and Resequencers")]),e._v(" for how to configure an aggregator in Java DSL.")],1),e._v(" "),a("h5",{attrs:{id:"configuring-an-aggregator-with-xml"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#configuring-an-aggregator-with-xml"}},[e._v("#")]),e._v(" Configuring an Aggregator with XML")]),e._v(" "),a("p",[e._v("Spring Integration supports the configuration of an aggregator with XML through the "),a("code",[e._v("")]),e._v(" element.\nThe following example shows an example of an aggregator:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n\n (24)\n (25)\n (26)\n\n\n\n\n\n\n\n \n\n\n\n\n\n\n\n')])])]),a("table",[a("thead",[a("tr",[a("th",[a("strong",[e._v("1")])]),e._v(" "),a("th",[e._v("The id of the aggregator is optional.")])])]),e._v(" "),a("tbody",[a("tr",[a("td",[a("strong",[e._v("2")])]),e._v(" "),a("td",[e._v("Lifecycle attribute signaling whether the aggregator should be started during application context startup."),a("br"),e._v("Optional (the default is 'true').")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("3")])]),e._v(" "),a("td",[e._v("The channel from which where aggregator receives messages."),a("br"),e._v("Required.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("4")])]),e._v(" "),a("td",[e._v("The channel to which the aggregator sends the aggregation results."),a("br"),e._v("Optional (because incoming messages can themselves specify a reply channel in the 'replyChannel' message header).")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("5")])]),e._v(" "),a("td",[e._v("The channel to which the aggregator sends the messages that timed out (if "),a("code",[e._v("send-partial-result-on-expiry")]),e._v(" is "),a("code",[e._v("false")]),e._v(")."),a("br"),e._v("Optional.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("6")])]),e._v(" "),a("td",[e._v("A reference to a "),a("code",[e._v("MessageGroupStore")]),e._v(" used to store groups of messages under their correlation key until they are complete."),a("br"),e._v("Optional."),a("br"),e._v("By default, it is a volatile in-memory store."),a("br"),e._v("See "),a("RouterLink",{attrs:{to:"/en/spring-integration/message-store.html#message-store"}},[e._v("Message Store")]),e._v(" for more information.")],1)]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("7")])]),e._v(" "),a("td",[e._v("The order of this aggregator when more than one handle is subscribed to the same "),a("code",[e._v("DirectChannel")]),e._v(" (use for load-balancing purposes)."),a("br"),e._v("Optional.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("8")])]),e._v(" "),a("td",[e._v("Indicates that expired messages should be aggregated and sent to the 'output-channel' or 'replyChannel' once their containing "),a("code",[e._v("MessageGroup")]),e._v(" is expired (see "),a("a",{attrs:{href:"https://docs.spring.io/spring-integration/api/org/springframework/integration/store/MessageGroupStore.html#expireMessageGroups-long",target:"_blank",rel:"noopener noreferrer"}},[a("code",[e._v("MessageGroupStore.expireMessageGroups(long)")]),a("OutboundLink")],1),e._v(")."),a("br"),e._v("One way of expiring a "),a("code",[e._v("MessageGroup")]),e._v(" is by configuring a "),a("code",[e._v("MessageGroupStoreReaper")]),e._v("."),a("br"),e._v("However you can alternatively expire "),a("code",[e._v("MessageGroup")]),e._v(" by calling "),a("code",[e._v("MessageGroupStore.expireMessageGroups(timeout)")]),e._v("."),a("br"),e._v("You can accomplish that through a Control Bus operation or, if you have a reference to the "),a("code",[e._v("MessageGroupStore")]),e._v(" instance, by invoking "),a("code",[e._v("expireMessageGroups(timeout)")]),e._v("."),a("br"),e._v("Otherwise, by itself, this attribute does nothing."),a("br"),e._v("It serves only as an indicator of whether to discard or send to the output or reply channel any messages that are still in the "),a("code",[e._v("MessageGroup")]),e._v(" that is about to be expired."),a("br"),e._v("Optional (the default is "),a("code",[e._v("false")]),e._v(")."),a("br"),e._v("NOTE: This attribute might more properly be called "),a("code",[e._v("send-partial-result-on-timeout")]),e._v(", because the group may not actually expire if "),a("code",[e._v("expire-groups-upon-timeout")]),e._v(" is set to "),a("code",[e._v("false")]),e._v(".")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("9")])]),e._v(" "),a("td",[e._v("The timeout interval to wait when sending a reply "),a("code",[e._v("Message")]),e._v(" to the "),a("code",[e._v("output-channel")]),e._v(" or "),a("code",[e._v("discard-channel")]),e._v("."),a("br"),e._v("Defaults to "),a("code",[e._v("-1")]),e._v(", which results in blocking indefinitely."),a("br"),e._v("It is applied only if the output channel has some 'sending' limitations, such as a "),a("code",[e._v("QueueChannel")]),e._v(" with a fixed 'capacity'."),a("br"),e._v("In this case, a "),a("code",[e._v("MessageDeliveryException")]),e._v(" is thrown."),a("br"),e._v("For "),a("code",[e._v("AbstractSubscribableChannel")]),e._v(" implementations, the "),a("code",[e._v("send-timeout")]),e._v(" is ignored ."),a("br"),e._v("For "),a("code",[e._v("group-timeout(-expression)")]),e._v(", the "),a("code",[e._v("MessageDeliveryException")]),e._v(" from the scheduled expire task leads this task to be rescheduled."),a("br"),e._v("Optional.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("10")])]),e._v(" "),a("td",[e._v("A reference to a bean that implements the message correlation (grouping) algorithm."),a("br"),e._v("The bean can be an implementation of the "),a("code",[e._v("CorrelationStrategy")]),e._v(" interface or a POJO."),a("br"),e._v("In the latter case, the "),a("code",[e._v("correlation-strategy-method")]),e._v(" attribute must be defined as well."),a("br"),e._v("Optional (by default, the aggregator uses the "),a("code",[e._v("IntegrationMessageHeaderAccessor.CORRELATION_ID")]),e._v(" header).")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("11")])]),e._v(" "),a("td",[e._v("A method defined on the bean referenced by "),a("code",[e._v("correlation-strategy")]),e._v("."),a("br"),e._v("It implements the correlation decision algorithm."),a("br"),e._v("Optional, with restrictions ("),a("code",[e._v("correlation-strategy")]),e._v(" must be present).")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("12")])]),e._v(" "),a("td",[e._v("A SpEL expression representing the correlation strategy."),a("br"),e._v("Example: "),a("code",[e._v("\"headers['something']\"")]),e._v("."),a("br"),e._v("Only one of "),a("code",[e._v("correlation-strategy")]),e._v(" or "),a("code",[e._v("correlation-strategy-expression")]),e._v(" is allowed.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("13")])]),e._v(" "),a("td",[e._v("A reference to a bean defined in the application context."),a("br"),e._v("The bean must implement the aggregation logic, as described earlier."),a("br"),e._v("Optional (by default, the list of aggregated messages becomes a payload of the output message).")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("14")])]),e._v(" "),a("td",[e._v("A method defined on the bean referenced by the "),a("code",[e._v("ref")]),e._v(" attribute."),a("br"),e._v("It implements the message aggregation algorithm."),a("br"),e._v("Optional (it depends on "),a("code",[e._v("ref")]),e._v(" attribute being defined).")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("15")])]),e._v(" "),a("td",[e._v("A reference to a bean that implements the release strategy."),a("br"),e._v("The bean can be an implementation of the "),a("code",[e._v("ReleaseStrategy")]),e._v(" interface or a POJO."),a("br"),e._v("In the latter case, the "),a("code",[e._v("release-strategy-method")]),e._v(" attribute must be defined as well."),a("br"),e._v("Optional (by default, the aggregator uses the "),a("code",[e._v("IntegrationMessageHeaderAccessor.SEQUENCE_SIZE")]),e._v(" header attribute).")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("16")])]),e._v(" "),a("td",[e._v("A method defined on the bean referenced by the "),a("code",[e._v("release-strategy")]),e._v(" attribute."),a("br"),e._v("It implements the completion decision algorithm."),a("br"),e._v("Optional, with restrictions ("),a("code",[e._v("release-strategy")]),e._v(" must be present).")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("17")])]),e._v(" "),a("td",[e._v("A SpEL expression representing the release strategy."),a("br"),e._v("The root object for the expression is a "),a("code",[e._v("MessageGroup")]),e._v("."),a("br"),e._v("Example: "),a("code",[e._v('"size() == 5"')]),e._v("."),a("br"),e._v("Only one of "),a("code",[e._v("release-strategy")]),e._v(" or "),a("code",[e._v("release-strategy-expression")]),e._v(" is allowed.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("18")])]),e._v(" "),a("td",[e._v("When set to "),a("code",[e._v("true")]),e._v(" (the default is "),a("code",[e._v("false")]),e._v("), completed groups are removed from the message store, letting subsequent messages with the same correlation form a new group."),a("br"),e._v("The default behavior is to send messages with the same correlation as a completed group to the "),a("code",[e._v("discard-channel")]),e._v(".")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("19")])]),e._v(" "),a("td",[e._v("Applies only if a "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" is configured for the "),a("code",[e._v("MessageStore")]),e._v(" of the "),a("code",[e._v("")]),e._v("."),a("br"),e._v("By default, when a "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" is configured to expire partial groups, empty groups are also removed."),a("br"),e._v("Empty groups exist after a group is normally released."),a("br"),e._v("The empty groups enable the detection and discarding of late-arriving messages."),a("br"),e._v("If you wish to expire empty groups on a longer schedule than expiring partial groups, set this property."),a("br"),e._v("Empty groups are then not removed from the "),a("code",[e._v("MessageStore")]),e._v(" until they have not been modified for at least this number of milliseconds."),a("br"),e._v("Note that the actual time to expire an empty group is also affected by the reaper’s "),a("code",[e._v("timeout")]),e._v(" property, and it could be as much as this value plus the timeout.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("20")])]),e._v(" "),a("td",[e._v("A reference to a "),a("code",[e._v("org.springframework.integration.util.LockRegistry")]),e._v(" bean."),a("br"),e._v("It used to obtain a "),a("code",[e._v("Lock")]),e._v(" based on the "),a("code",[e._v("groupId")]),e._v(" for concurrent operations on the "),a("code",[e._v("MessageGroup")]),e._v("."),a("br"),e._v("By default, an internal "),a("code",[e._v("DefaultLockRegistry")]),e._v(" is used."),a("br"),e._v("Use of a distributed "),a("code",[e._v("LockRegistry")]),e._v(", such as the "),a("code",[e._v("ZookeeperLockRegistry")]),e._v(", ensures only one instance of the aggregator can operate on a group concurrently."),a("br"),e._v("See "),a("RouterLink",{attrs:{to:"/en/spring-integration/redis.html#redis-lock-registry"}},[e._v("Redis Lock Registry")]),e._v(", "),a("RouterLink",{attrs:{to:"/en/spring-integration/gemfire.html#gemfire-lock-registry"}},[e._v("Gemfire Lock Registry")]),e._v(", and "),a("RouterLink",{attrs:{to:"/en/spring-integration/zookeeper.html#zk-lock-registry"}},[e._v("Zookeeper Lock Registry")]),e._v(" for more information.")],1)]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("21")])]),e._v(" "),a("td",[e._v("A timeout (in milliseconds) to force the "),a("code",[e._v("MessageGroup")]),e._v(" complete when the "),a("code",[e._v("ReleaseStrategy")]),e._v(" does not release the group when the current message arrives."),a("br"),e._v("This attribute provides a built-in time-based release strategy for the aggregator when there is a need to emit a partial result (or discard the group) if a new message does not arrive for the "),a("code",[e._v("MessageGroup")]),e._v(" within the timeout which counts from the time the last message arrived."),a("br"),e._v("To set up a timeout which counts from the time the "),a("code",[e._v("MessageGroup")]),e._v(" was created see "),a("code",[e._v("group-timeout-expression")]),e._v(" information."),a("br"),e._v("When a new message arrives at the aggregator, any existing "),a("code",[e._v("ScheduledFuture")]),e._v(" for its "),a("code",[e._v("MessageGroup")]),e._v(" is canceled."),a("br"),e._v("If the "),a("code",[e._v("ReleaseStrategy")]),e._v(" returns "),a("code",[e._v("false")]),e._v(" (meaning do not release) and "),a("code",[e._v("groupTimeout > 0")]),e._v(", a new task is scheduled to expire the group."),a("br"),e._v("We do not advise setting this attribute to zero (or a negative value)."),a("br"),e._v("Doing so effectively disables the aggregator, because every message group is immediately completed."),a("br"),e._v("You can, however, conditionally set it to zero (or a negative value) by using an expression."),a("br"),e._v("See "),a("code",[e._v("group-timeout-expression")]),e._v(" for information."),a("br"),e._v("The action taken during the completion depends on the "),a("code",[e._v("ReleaseStrategy")]),e._v(" and the "),a("code",[e._v("send-partial-group-on-expiry")]),e._v(" attribute."),a("br"),e._v("See "),a("a",{attrs:{href:"#agg-and-group-to"}},[e._v("Aggregator and Group Timeout")]),e._v(" for more information."),a("br"),e._v("It is mutually exclusive with 'group-timeout-expression' attribute.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("22")])]),e._v(" "),a("td",[e._v("The SpEL expression that evaluates to a "),a("code",[e._v("groupTimeout")]),e._v(" with the "),a("code",[e._v("MessageGroup")]),e._v(" as the "),a("code",[e._v("#root")]),e._v(" evaluation context object."),a("br"),e._v("Used for scheduling the "),a("code",[e._v("MessageGroup")]),e._v(" to be forced complete."),a("br"),e._v("If the expression evaluates to "),a("code",[e._v("null")]),e._v(", the completion is not scheduled."),a("br"),e._v("If it evaluates to zero, the group is completed immediately on the current thread."),a("br"),e._v("In effect, this provides a dynamic "),a("code",[e._v("group-timeout")]),e._v(" property."),a("br"),e._v("As an example, if you wish to forcibly complete a "),a("code",[e._v("MessageGroup")]),e._v(" after 10 seconds have elapsed since the time the group was created you might consider using the following SpEL expression: "),a("code",[e._v("timestamp + 10000 - T(System).currentTimeMillis()")]),e._v(" where "),a("code",[e._v("timestamp")]),e._v(" is provided by "),a("code",[e._v("MessageGroup.getTimestamp()")]),e._v(" as the "),a("code",[e._v("MessageGroup")]),e._v(" here is the "),a("code",[e._v("#root")]),e._v(" evaluation context object."),a("br"),e._v("Bear in mind however that the group creation time might differ from the time of the first arrived message depending on other group expiration properties' configuration."),a("br"),e._v("See "),a("code",[e._v("group-timeout")]),e._v(" for more information."),a("br"),e._v("Mutually exclusive with 'group-timeout' attribute.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("23")])]),e._v(" "),a("td",[e._v("When a group is completed due to a timeout (or by a "),a("code",[e._v("MessageGroupStoreReaper")]),e._v("), the group is expired (completely removed) by default."),a("br"),e._v("Late arriving messages start a new group."),a("br"),e._v("You can set this to "),a("code",[e._v("false")]),e._v(" to complete the group but have its metadata remain so that late arriving messages are discarded."),a("br"),e._v("Empty groups can be expired later using a "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" together with the "),a("code",[e._v("empty-group-min-timeout")]),e._v(" attribute."),a("br"),e._v("It defaults to 'true'.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("24")])]),e._v(" "),a("td",[e._v("A "),a("code",[e._v("TaskScheduler")]),e._v(" bean reference to schedule the "),a("code",[e._v("MessageGroup")]),e._v(" to be forced complete if no new message arrives for the "),a("code",[e._v("MessageGroup")]),e._v(" within the "),a("code",[e._v("groupTimeout")]),e._v("."),a("br"),e._v("If not provided, the default scheduler ("),a("code",[e._v("taskScheduler")]),e._v(") registered in the "),a("code",[e._v("ApplicationContext")]),e._v(" ("),a("code",[e._v("ThreadPoolTaskScheduler")]),e._v(") is used."),a("br"),e._v("This attribute does not apply if "),a("code",[e._v("group-timeout")]),e._v(" or "),a("code",[e._v("group-timeout-expression")]),e._v(" is not specified.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("25")])]),e._v(" "),a("td",[e._v("Since version 4.1."),a("br"),e._v("It lets a transaction be started for the "),a("code",[e._v("forceComplete")]),e._v(" operation."),a("br"),e._v("It is initiated from a "),a("code",[e._v("group-timeout(-expression)")]),e._v(" or by a "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" and is not applied to the normal "),a("code",[e._v("add")]),e._v(", "),a("code",[e._v("release")]),e._v(", and "),a("code",[e._v("discard")]),e._v(" operations."),a("br"),e._v("Only this sub-element or "),a("code",[e._v("")]),e._v(" is allowed.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("26")])]),e._v(" "),a("td",[e._v("Since "),a("em",[e._v("version 4.1")]),e._v("."),a("br"),e._v("It allows the configuration of any "),a("code",[e._v("Advice")]),e._v(" for the "),a("code",[e._v("forceComplete")]),e._v(" operation."),a("br"),e._v("It is initiated from a "),a("code",[e._v("group-timeout(-expression)")]),e._v(" or by a "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" and is not applied to the normal "),a("code",[e._v("add")]),e._v(", "),a("code",[e._v("release")]),e._v(", and "),a("code",[e._v("discard")]),e._v(" operations."),a("br"),e._v("Only this sub-element or "),a("code",[e._v("")]),e._v(" is allowed."),a("br"),e._v("A transaction "),a("code",[e._v("Advice")]),e._v(" can also be configured here by using the Spring "),a("code",[e._v("tx")]),e._v(" namespace.")])])])]),e._v(" "),a("p",[e._v("| |Expiring Groups"),a("br"),a("br"),e._v("There are two attributes related to expiring (completely removing) groups."),a("br"),e._v("When a group is expired, there is no record of it, and, if a new message arrives with the same correlation, a new group is started."),a("br"),e._v("When a group is completed (without expiry), the empty group remains and late-arriving messages are discarded."),a("br"),e._v("Empty groups can be removed later by using a "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" in combination with the "),a("code",[e._v("empty-group-min-timeout")]),e._v(" attribute."),a("br"),a("br"),a("code",[e._v("expire-groups-upon-completion")]),e._v(" relates to “normal” completion when the "),a("code",[e._v("ReleaseStrategy")]),e._v(" releases the group."),a("br"),e._v("This defaults to "),a("code",[e._v("false")]),e._v("."),a("br"),a("br"),e._v("If a group is not completed normally but is released or discarded because of a timeout, the group is normally expired."),a("br"),e._v("Since version 4.1, you can control this behavior by using "),a("code",[e._v("expire-groups-upon-timeout")]),e._v("."),a("br"),e._v("It defaults to "),a("code",[e._v("true")]),e._v(" for backwards compatibility."),a("br"),a("br"),e._v("| |When a group is timed out, the "),a("code",[e._v("ReleaseStrategy")]),e._v(" is given one more opportunity to release the group."),a("br"),e._v("If it does so and "),a("code",[e._v("expire-groups-upon-timeout")]),e._v(" is false, expiration is controlled by "),a("code",[e._v("expire-groups-upon-completion")]),e._v("."),a("br"),e._v("If the group is not released by the release strategy during timeout, then the expiration is controlled by the "),a("code",[e._v("expire-groups-upon-timeout")]),e._v("."),a("br"),e._v("Timed-out groups are either discarded or a partial release occurs (based on "),a("code",[e._v("send-partial-result-on-expiry")]),e._v(").|"),a("br"),e._v("|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|"),a("br"),a("br"),e._v("Since version 5.0, empty groups are also scheduled for removal after "),a("code",[e._v("empty-group-min-timeout")]),e._v("."),a("br"),e._v("If "),a("code",[e._v("expireGroupsUponCompletion == false")]),e._v(" and "),a("code",[e._v("minimumTimeoutForEmptyGroups > 0")]),e._v(", the task to remove the group is scheduled when normal or partial sequences release happens."),a("br"),a("br"),e._v("Starting with version 5.4, the aggregator (and resequencer) can be configured to expire orphaned groups (groups in a persistent message store that might not otherwise be released)."),a("br"),e._v("The "),a("code",[e._v("expireTimeout")]),e._v(" (if greater than "),a("code",[e._v("0")]),e._v(") indicates that groups older than this value in the store should be purged."),a("br"),e._v("The "),a("code",[e._v("purgeOrphanedGroups()")]),e._v(" method is called on start up and, together with the provided "),a("code",[e._v("expireDuration")]),e._v(", periodically within a scheduled task."),a("br"),e._v("This method is also can be called externally at any time."),a("br"),e._v("The expiration logic is fully delegated to the "),a("code",[e._v("forceComplete(MessageGroup)")]),e._v(" functionality according to the provided expiration options mentioned above."),a("br"),e._v("Such a periodic purge functionality is useful when a message store is needed to be cleaned up from those old groups which are not going to be released any more with regular message arrival logic."),a("br"),e._v("In most cases this happens after an application restart, when using a persistent message group store."),a("br"),e._v("The functionality is similar to the "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" with a scheduled task, but provides a convenient way to deal with old groups within specific components, when using group timeout instead of a reaper."),a("br"),e._v("The "),a("code",[e._v("MessageGroupStore")]),e._v(" must be provided exclusively for the current correlation endpoint."),a("br"),e._v("Otherwise one aggregator may purge groups from another."),a("br"),e._v("With the aggregator, groups expired using this technique will either be discarded or released as a partial group, depending on the "),a("code",[e._v("expireGroupsUponCompletion")]),e._v(" property.|\n|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|\n| | When a group is timed out, the "),a("code",[e._v("ReleaseStrategy")]),e._v(" is given one more opportunity to release the group."),a("br"),e._v("If it does so and "),a("code",[e._v("expire-groups-upon-timeout")]),e._v(" is false, expiration is controlled by "),a("code",[e._v("expire-groups-upon-completion")]),e._v("."),a("br"),e._v("If the group is not released by the release strategy during timeout, then the expiration is controlled by the "),a("code",[e._v("expire-groups-upon-timeout")]),e._v("."),a("br"),e._v("Timed-out groups are either discarded or a partial release occurs (based on "),a("code",[e._v("send-partial-result-on-expiry")]),e._v("). |")]),e._v(" "),a("p",[e._v("We generally recommend using a "),a("code",[e._v("ref")]),e._v(" attribute if a custom aggregator handler implementation may be referenced in other "),a("code",[e._v("")]),e._v(" definitions.\nHowever, if a custom aggregator implementation is only being used by a single definition of the "),a("code",[e._v("")]),e._v(", you can use an inner bean definition (starting with version 1.0.3) to configure the aggregation POJO within the "),a("code",[e._v("")]),e._v(" element, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n\n')])])]),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Using both a "),a("code",[e._v("ref")]),e._v(" attribute and an inner bean definition in the same "),a("code",[e._v("")]),e._v(" configuration is not allowed, as it creates an ambiguous condition."),a("br"),e._v("In such cases, an Exception is thrown.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("The following example shows an implementation of the aggregator bean:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public class PojoAggregator {\n\n public Long add(List results) {\n long total = 0l;\n for (long partialResult: results) {\n total += partialResult;\n }\n return total;\n }\n}\n")])])]),a("p",[e._v("An implementation of the completion strategy bean for the preceding example might be as follows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public class PojoReleaseStrategy {\n...\n public boolean canRelease(List numbers) {\n int sum = 0;\n for (long number: numbers) {\n sum += number;\n }\n return sum >= maxValue;\n }\n}\n")])])]),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Wherever it makes sense to do so, the release strategy method and the aggregator method can be combined into a single bean.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("An implementation of the correlation strategy bean for the example above might be as follows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public class PojoCorrelationStrategy {\n...\n public Long groupNumbersByLastDigit(Long number) {\n return number % 10;\n }\n}\n")])])]),a("p",[e._v("The aggregator in the preceding example would group numbers by some criterion (in this case, the remainder after dividing by ten) and hold the group until the sum of the numbers provided by the payloads exceeds a certain value.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Wherever it makes sense to do so, the release strategy method, the correlation strategy method, and the aggregator method can be combined in a single bean."),a("br"),e._v("(Actually, all of them or any two of them can be combined.)")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h6",{attrs:{id:"-3"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#-3"}},[e._v("#")])]),e._v(" "),a("p",[e._v("Since Spring Integration 2.0, you can handle the various strategies (correlation, release, and aggregation) with "),a("a",{attrs:{href:"https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#expressions",target:"_blank",rel:"noopener noreferrer"}},[e._v("SpEL"),a("OutboundLink")],1),e._v(", which we recommend if the logic behind such a release strategy is relatively simple.\nSuppose you have a legacy component that was designed to receive an array of objects.\nWe know that the default release strategy assembles all aggregated messages in the "),a("code",[e._v("List")]),e._v(".\nNow we have two problems.\nFirst, we need to extract individual messages from the list.\nSecond, we need to extract the payload of each message and assemble the array of objects.\nThe following example solves both problems:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public String[] processRelease(List> messages){\n List stringList = new ArrayList();\n for (Message message : messages) {\n stringList.add(message.getPayload());\n }\n return stringList.toArray(new String[]{});\n}\n")])])]),a("p",[e._v("However, with SpEL, such a requirement could actually be handled relatively easily with a one-line expression, thus sparing you from writing a custom class and configuring it as a bean.\nThe following example shows how to do so:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n')])])]),a("p",[e._v("In the preceding configuration, we use a "),a("a",{attrs:{href:"https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#expressions",target:"_blank",rel:"noopener noreferrer"}},[e._v("collection projection"),a("OutboundLink")],1),e._v(" expression to assemble a new collection from the payloads of all the messages in the list and then transform it to an array, thus achieving the same result as the earlier Java code.")]),e._v(" "),a("p",[e._v("You can apply the same expression-based approach when dealing with custom release and correlation strategies.")]),e._v(" "),a("p",[e._v("Instead of defining a bean for a custom "),a("code",[e._v("CorrelationStrategy")]),e._v(" in the "),a("code",[e._v("correlation-strategy")]),e._v(" attribute, you can implement your simple correlation logic as a SpEL expression and configure it in the "),a("code",[e._v("correlation-strategy-expression")]),e._v(" attribute, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('correlation-strategy-expression="payload.person.id"\n')])])]),a("p",[e._v("In the preceding example, we assume that the payload has a "),a("code",[e._v("person")]),e._v(" attribute with an "),a("code",[e._v("id")]),e._v(", which is going to be used to correlate messages.")]),e._v(" "),a("p",[e._v("Likewise, for the "),a("code",[e._v("ReleaseStrategy")]),e._v(", you can implement your release logic as a SpEL expression and configure it in the "),a("code",[e._v("release-strategy-expression")]),e._v(" attribute.\nThe root object for evaluation context is the "),a("code",[e._v("MessageGroup")]),e._v(" itself.\nThe "),a("code",[e._v("List")]),e._v(" of messages can be referenced by using the "),a("code",[e._v("message")]),e._v(" property of the group within the expression.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("In releases prior to version 5.0, the root object was the collection of "),a("code",[e._v("Message")]),e._v(", as the previous example shows:")])])]),e._v(" "),a("tbody")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('release-strategy-expression="!messages.?[payload==5].empty"\n')])])]),a("p",[e._v("In the preceding example, the root object of the SpEL evaluation context is the "),a("code",[e._v("MessageGroup")]),e._v(" itself, and you are stating that, as soon as there is a message with payload of "),a("code",[e._v("5")]),e._v(" in this group, the group should be released.")]),e._v(" "),a("h6",{attrs:{id:"aggregator-and-group-timeout"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#aggregator-and-group-timeout"}},[e._v("#")]),e._v(" Aggregator and Group Timeout")]),e._v(" "),a("p",[e._v("Starting with version 4.0, two new mutually exclusive attributes have been introduced: "),a("code",[e._v("group-timeout")]),e._v(" and "),a("code",[e._v("group-timeout-expression")]),e._v(".\nSee "),a("a",{attrs:{href:"#aggregator-xml"}},[e._v("Configuring an Aggregator with XML")]),e._v(".\nIn some cases, you may need to emit the aggregator result (or discard the group) after a timeout if the "),a("code",[e._v("ReleaseStrategy")]),e._v(" does not release when the current message arrives.\nFor this purpose, the "),a("code",[e._v("groupTimeout")]),e._v(" option lets scheduling the "),a("code",[e._v("MessageGroup")]),e._v(" be forced to complete, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n')])])]),a("p",[e._v("With this example, the normal release is possible if the aggregator receives the last message in sequence as defined by the "),a("code",[e._v("release-strategy-expression")]),e._v(".\nIf that specific message does not arrive, the "),a("code",[e._v("groupTimeout")]),e._v(" forces the group to complete after ten seconds, as long as the group contains at least two Messages.")]),e._v(" "),a("p",[e._v("The results of forcing the group to complete depends on the "),a("code",[e._v("ReleaseStrategy")]),e._v(" and the "),a("code",[e._v("send-partial-result-on-expiry")]),e._v(".\nFirst, the release strategy is again consulted to see if a normal release is to be made.\nWhile the group has not changed, the "),a("code",[e._v("ReleaseStrategy")]),e._v(" can decide to release the group at this time.\nIf the release strategy still does not release the group, it is expired.\nIf "),a("code",[e._v("send-partial-result-on-expiry")]),e._v(" is "),a("code",[e._v("true")]),e._v(", existing messages in the (partial) "),a("code",[e._v("MessageGroup")]),e._v(" are released as a normal aggregator reply message to the "),a("code",[e._v("output-channel")]),e._v(".\nOtherwise, it is discarded.")]),e._v(" "),a("p",[e._v("There is a difference between "),a("code",[e._v("groupTimeout")]),e._v(" behavior and "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" (see "),a("a",{attrs:{href:"#aggregator-xml"}},[e._v("Configuring an Aggregator with XML")]),e._v(").\nThe reaper initiates forced completion for all "),a("code",[e._v("MessageGroup")]),e._v(" s in the "),a("code",[e._v("MessageGroupStore")]),e._v(" periodically.\nThe "),a("code",[e._v("groupTimeout")]),e._v(" does it for each "),a("code",[e._v("MessageGroup")]),e._v(" individually if a new message does not arrive during the "),a("code",[e._v("groupTimeout")]),e._v(".\nAlso, the reaper can be used to remove empty groups (empty groups are retained in order to discard late messages if "),a("code",[e._v("expire-groups-upon-completion")]),e._v(" is false).")]),e._v(" "),a("p",[e._v("Starting with version 5.5, the "),a("code",[e._v("groupTimeoutExpression")]),e._v(" can be evaluated to a "),a("code",[e._v("java.util.Date")]),e._v(" instance.\nThis can be useful in cases like determining a scheduled task moment based on the group creation time ("),a("code",[e._v("MessageGroup.getTimestamp()")]),e._v(") instead of a current message arrival as it is calculated when "),a("code",[e._v("groupTimeoutExpression")]),e._v(" is evaluated to "),a("code",[e._v("long")]),e._v(":")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"\n')])])]),a("h5",{attrs:{id:"configuring-an-aggregator-with-annotations"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#configuring-an-aggregator-with-annotations"}},[e._v("#")]),e._v(" Configuring an Aggregator with Annotations")]),e._v(" "),a("p",[e._v("The following example shows an aggregator configured with annotations:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public class Waiter {\n ...\n\n @Aggregator (1)\n public Delivery aggregatingMethod(List items) {\n ...\n }\n\n @ReleaseStrategy (2)\n public boolean releaseChecker(List> messages) {\n ...\n }\n\n @CorrelationStrategy (3)\n public String correlateBy(OrderItem item) {\n ...\n }\n}\n")])])]),a("table",[a("thead",[a("tr",[a("th",[a("strong",[e._v("1")])]),e._v(" "),a("th",[e._v("An annotation indicating that this method should be used as an aggregator."),a("br"),e._v("It must be specified if this class is used as an aggregator.")])])]),e._v(" "),a("tbody",[a("tr",[a("td",[a("strong",[e._v("2")])]),e._v(" "),a("td",[e._v("An annotation indicating that this method is used as the release strategy of an aggregator."),a("br"),e._v("If not present on any method, the aggregator uses the "),a("code",[e._v("SimpleSequenceSizeReleaseStrategy")]),e._v(".")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("3")])]),e._v(" "),a("td",[e._v("An annotation indicating that this method should be used as the correlation strategy of an aggregator."),a("br"),e._v("If no correlation strategy is indicated, the aggregator uses the "),a("code",[e._v("HeaderAttributeCorrelationStrategy")]),e._v(" based on "),a("code",[e._v("CORRELATION_ID")]),e._v(".")])])])]),e._v(" "),a("p",[e._v("All of the configuration options provided by the XML element are also available for the "),a("code",[e._v("@Aggregator")]),e._v(" annotation.")]),e._v(" "),a("p",[e._v("The aggregator can be either referenced explicitly from XML or, if the "),a("code",[e._v("@MessageEndpoint")]),e._v(" is defined on the class, detected automatically through classpath scanning.")]),e._v(" "),a("p",[e._v("Annotation configuration ("),a("code",[e._v("@Aggregator")]),e._v(" and others) for the Aggregator component covers only simple use cases, where most default options are sufficient.\nIf you need more control over those options when using annotation configuration, consider using a "),a("code",[e._v("@Bean")]),e._v(" definition for the "),a("code",[e._v("AggregatingMessageHandler")]),e._v(" and mark its "),a("code",[e._v("@Bean")]),e._v(" method with "),a("code",[e._v("@ServiceActivator")]),e._v(", as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@ServiceActivator(inputChannel = "aggregatorChannel")\n@Bean\npublic MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {\n AggregatingMessageHandler aggregator =\n new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),\n jdbcMessageGroupStore);\n aggregator.setOutputChannel(resultsChannel());\n aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));\n aggregator.setTaskScheduler(this.taskScheduler);\n return aggregator;\n}\n')])])]),a("p",[e._v("See "),a("a",{attrs:{href:"#aggregator-api"}},[e._v("Programming Model")]),e._v(" and "),a("RouterLink",{attrs:{to:"/en/spring-integration/configuration.html#annotations_on_beans"}},[e._v("Annotations on "),a("code",[e._v("@Bean")]),e._v(" Methods")]),e._v(" for more information.")],1),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Starting with version 4.2, the "),a("code",[e._v("AggregatorFactoryBean")]),e._v(" is available to simplify Java configuration for the "),a("code",[e._v("AggregatingMessageHandler")]),e._v(".")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h4",{attrs:{id:"managing-state-in-an-aggregator-messagegroupstore"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#managing-state-in-an-aggregator-messagegroupstore"}},[e._v("#")]),e._v(" Managing State in an Aggregator: "),a("code",[e._v("MessageGroupStore")])]),e._v(" "),a("p",[e._v("Aggregator (and some other patterns in Spring Integration) is a stateful pattern that requires decisions to be made based on a group of messages that have arrived over a period of time, all with the same correlation key.\nThe design of the interfaces in the stateful patterns (such as "),a("code",[e._v("ReleaseStrategy")]),e._v(") is driven by the principle that the components (whether defined by the framework or by a user) should be able to remain stateless.\nAll state is carried by the "),a("code",[e._v("MessageGroup")]),e._v(" and its management is delegated to the "),a("code",[e._v("MessageGroupStore")]),e._v(".\nThe "),a("code",[e._v("MessageGroupStore")]),e._v(" interface is defined as follows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public interface MessageGroupStore {\n\n int getMessageCountForAllMessageGroups();\n\n int getMarkedMessageCountForAllMessageGroups();\n\n int getMessageGroupCount();\n\n MessageGroup getMessageGroup(Object groupId);\n\n MessageGroup addMessageToGroup(Object groupId, Message message);\n\n MessageGroup markMessageGroup(MessageGroup group);\n\n MessageGroup removeMessageFromGroup(Object key, Message messageToRemove);\n\n MessageGroup markMessageFromGroup(Object key, Message messageToMark);\n\n void removeMessageGroup(Object groupId);\n\n void registerMessageGroupExpiryCallback(MessageGroupCallback callback);\n\n int expireMessageGroups(long timeout);\n}\n")])])]),a("p",[e._v("For more information, see the "),a("a",{attrs:{href:"https://docs.spring.io/spring-integration/api/org/springframework/integration/store/MessageGroupStore.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("Javadoc"),a("OutboundLink")],1),e._v(".")]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("MessageGroupStore")]),e._v(" accumulates state information in "),a("code",[e._v("MessageGroups")]),e._v(" while waiting for a release strategy to be triggered, and that event might not ever happen.\nSo, to prevent stale messages from lingering, and for volatile stores to provide a hook for cleaning up when the application shuts down, the "),a("code",[e._v("MessageGroupStore")]),e._v(" lets you register callbacks to apply to its "),a("code",[e._v("MessageGroups")]),e._v(" when they expire.\nThe interface is very straightforward, as the following listing shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public interface MessageGroupCallback {\n\n void execute(MessageGroupStore messageGroupStore, MessageGroup group);\n\n}\n")])])]),a("p",[e._v("The callback has direct access to the store and the message group so that it can manage the persistent state (for example, by entirely removing the group from the store).")]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("MessageGroupStore")]),e._v(" maintains a list of these callbacks, which it applies, on demand, to all messages whose timestamps are earlier than a time supplied as a parameter (see the "),a("code",[e._v("registerMessageGroupExpiryCallback(..)")]),e._v(" and "),a("code",[e._v("expireMessageGroups(..)")]),e._v(" methods, described earlier).")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("It is important not to use the same "),a("code",[e._v("MessageGroupStore")]),e._v(" instance in different aggregator components, when you intend to rely on the "),a("code",[e._v("expireMessageGroups")]),e._v(" functionality."),a("br"),e._v("Every "),a("code",[e._v("AbstractCorrelatingMessageHandler")]),e._v(" registers its own "),a("code",[e._v("MessageGroupCallback")]),e._v(" based on the "),a("code",[e._v("forceComplete()")]),e._v(" callback."),a("br"),e._v("This way each group for expiration may be completed or discarded by the wrong aggregator."),a("br"),e._v("Starting with version 5.0.10, a "),a("code",[e._v("UniqueExpiryCallback")]),e._v(" is used from the "),a("code",[e._v("AbstractCorrelatingMessageHandler")]),e._v(" for the registration callback in the "),a("code",[e._v("MessageGroupStore")]),e._v("."),a("br"),e._v("The "),a("code",[e._v("MessageGroupStore")]),e._v(", in turn, checks for presence an instance of this class and logs an error with an appropriate message if one is already present in the callbacks set."),a("br"),e._v("This way the Framework disallows usage of the "),a("code",[e._v("MessageGroupStore")]),e._v(" instance in different aggregators/resequencers to avoid the mentioned side effect of expiration the groups not created by the particular correlation handler.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("You can call the "),a("code",[e._v("expireMessageGroups")]),e._v(" method with a timeout value.\nAny message older than the current time minus this value is expired and has the callbacks applied.\nThus, it is the user of the store that defines what is meant by message group “expiry”.")]),e._v(" "),a("p",[e._v("As a convenience for users, Spring Integration provides a wrapper for the message expiry in the form of a "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(", as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n \n\n\n\n \n\n')])])]),a("p",[e._v("The reaper is a "),a("code",[e._v("Runnable")]),e._v(".\nIn the preceding example, the message group store’s expire method is called every ten seconds.\nThe timeout itself is 30 seconds.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("It is important to understand that the 'timeout' property of "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" is an approximate value and is impacted by the rate of the task scheduler, since this property is only checked on the next scheduled execution of the "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" task."),a("br"),e._v("For example, if the timeout is set for ten minutes but the "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" task is scheduled to run every hour and the last execution of the "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" task happened one minute before the timeout, the "),a("code",[e._v("MessageGroup")]),e._v(" does not expire for the next 59 minutes."),a("br"),e._v("Consequently, we recommend setting the rate to be at least equal to the value of the timeout or shorter.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("In addition to the reaper, the expiry callbacks are invoked when the application shuts down through a lifecycle callback in the "),a("code",[e._v("AbstractCorrelatingMessageHandler")]),e._v(".")]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("AbstractCorrelatingMessageHandler")]),e._v(" registers its own expiry callback, and this is the link with the boolean flag "),a("code",[e._v("send-partial-result-on-expiry")]),e._v(" in the XML configuration of the aggregator.\nIf the flag is set to "),a("code",[e._v("true")]),e._v(", then, when the expiry callback is invoked, any unmarked messages in groups that are not yet released can be sent on to the output channel.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Since the "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" is called from a scheduled task, and may result in the production of a message (depending on the "),a("code",[e._v("sendPartialResultOnExpiry")]),e._v(" option) to a downstream integration flow, it is recommended to supply a custom "),a("code",[e._v("TaskScheduler")]),e._v(" with a "),a("code",[e._v("MessagePublishingErrorHandler")]),e._v(" to handler exceptions via an "),a("code",[e._v("errorChannel")]),e._v(", as it might be expected by the regular aggregator release functionality."),a("br"),e._v("The same logic applies for group timeout functionality which also relies on a "),a("code",[e._v("TaskScheduler")]),e._v("."),a("br"),e._v("See "),a("RouterLink",{attrs:{to:"/en/spring-integration/error-handling.html#error-handling"}},[e._v("Error Handling")]),e._v(" for more information.")],1)])]),e._v(" "),a("tbody")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("When a shared "),a("code",[e._v("MessageStore")]),e._v(" is used for different correlation endpoints, you must configure a proper "),a("code",[e._v("CorrelationStrategy")]),e._v(" to ensure uniqueness for group IDs."),a("br"),e._v("Otherwise, unexpected behavior may happen when one correlation endpoint releases or expire messages from others."),a("br"),e._v("Messages with the same correlation key are stored in the same message group."),a("br"),a("br"),e._v("Some "),a("code",[e._v("MessageStore")]),e._v(" implementations allow using the same physical resources, by partitioning the data."),a("br"),e._v("For example, the "),a("code",[e._v("JdbcMessageStore")]),e._v(" has a "),a("code",[e._v("region")]),e._v(" property, and the "),a("code",[e._v("MongoDbMessageStore")]),e._v(" has a "),a("code",[e._v("collectionName")]),e._v(" property."),a("br"),a("br"),e._v("For more information about the "),a("code",[e._v("MessageStore")]),e._v(" interface and its implementations, see "),a("RouterLink",{attrs:{to:"/en/spring-integration/message-store.html#message-store"}},[e._v("Message Store")]),e._v(".")],1)])]),e._v(" "),a("tbody")]),e._v(" "),a("h4",{attrs:{id:"flux-aggregator"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#flux-aggregator"}},[e._v("#")]),e._v(" Flux Aggregator")]),e._v(" "),a("p",[e._v("In version 5.2, the "),a("code",[e._v("FluxAggregatorMessageHandler")]),e._v(" component has been introduced.\nIt is based on the Project Reactor "),a("code",[e._v("Flux.groupBy()")]),e._v(" and "),a("code",[e._v("Flux.window()")]),e._v(" operators.\nThe incoming messages are emitted into the "),a("code",[e._v("FluxSink")]),e._v(" initiated by the "),a("code",[e._v("Flux.create()")]),e._v(" in the constructor of this component.\nIf the "),a("code",[e._v("outputChannel")]),e._v(" is not provided or it is not an instance of "),a("code",[e._v("ReactiveStreamsSubscribableChannel")]),e._v(", the subscription to the main "),a("code",[e._v("Flux")]),e._v(" is done from the "),a("code",[e._v("Lifecycle.start()")]),e._v(" implementation.\nOtherwise it is postponed to the subscription done by the "),a("code",[e._v("ReactiveStreamsSubscribableChannel")]),e._v(" implementation.\nThe messages are grouped by the "),a("code",[e._v("Flux.groupBy()")]),e._v(" using a "),a("code",[e._v("CorrelationStrategy")]),e._v(" for the group key.\nBy default, the "),a("code",[e._v("IntegrationMessageHeaderAccessor.CORRELATION_ID")]),e._v(" header of the message is consulted.")]),e._v(" "),a("p",[e._v("By default every closed window is released as a "),a("code",[e._v("Flux")]),e._v(" in payload of a message to produce.\nThis message contains all the headers from the first message in the window.\nThis "),a("code",[e._v("Flux")]),e._v(" in the output message payload must be subscribed and processed downstream.\nSuch a logic can be customized (or superseded) by the "),a("code",[e._v("setCombineFunction(Function>, Mono>>)")]),e._v(" configuration option of the "),a("code",[e._v("FluxAggregatorMessageHandler")]),e._v(".\nFor example, if we would like to have a "),a("code",[e._v("List")]),e._v(" of payloads in the final message, we can configure a "),a("code",[e._v("Flux.collectList()")]),e._v(" like this:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("fluxAggregatorMessageHandler.setCombineFunction(\n (messageFlux) ->\n messageFlux\n .map(Message::getPayload)\n .collectList()\n .map(GenericMessage::new));\n")])])]),a("p",[e._v("There are several options in the "),a("code",[e._v("FluxAggregatorMessageHandler")]),e._v(" to select an appropriate window strategy:")]),e._v(" "),a("ul",[a("li",[a("p",[a("code",[e._v("setBoundaryTrigger(Predicate>)")]),e._v(" - is propagated to the "),a("code",[e._v("Flux.windowUntil()")]),e._v(" operator.\nSee its JavaDocs for more information.\nHas a precedence over all other window options.")])]),e._v(" "),a("li",[a("p",[a("code",[e._v("setWindowSize(int)")]),e._v(" and "),a("code",[e._v("setWindowSizeFunction(Function, Integer>)")]),e._v(" - is propagated to the "),a("code",[e._v("Flux.window(int)")]),e._v(" or "),a("code",[e._v("windowTimeout(int, Duration)")]),e._v(".\nBy default a window size is calculated from the first message in group and its "),a("code",[e._v("IntegrationMessageHeaderAccessor.SEQUENCE_SIZE")]),e._v(" header.")])]),e._v(" "),a("li",[a("p",[a("code",[e._v("setWindowTimespan(Duration)")]),e._v(" - is propagated to the "),a("code",[e._v("Flux.window(Duration)")]),e._v(" or "),a("code",[e._v("windowTimeout(int, Duration)")]),e._v(" depending in the window size configuration.")])]),e._v(" "),a("li",[a("p",[a("code",[e._v("setWindowConfigurer(Function>, Flux>>>)")]),e._v(" - a function to apply a transformation into the grouped fluxes for any custom window operation not covered by the exposed options.")])])]),e._v(" "),a("p",[e._v("Since this component is a "),a("code",[e._v("MessageHandler")]),e._v(" implementation it can simply be used as a "),a("code",[e._v("@Bean")]),e._v(" definition together with a "),a("code",[e._v("@ServiceActivator")]),e._v(" messaging annotation.\nWith Java DSL it can be used from the "),a("code",[e._v(".handle()")]),e._v(" EIP-method.\nThe sample below demonstrates how we can register an "),a("code",[e._v("IntegrationFlow")]),e._v(" at runtime and how a "),a("code",[e._v("FluxAggregatorMessageHandler")]),e._v(" can be correlated with a splitter upstream:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('IntegrationFlow fluxFlow =\n (flow) -> flow\n .split()\n .channel(MessageChannels.flux())\n .handle(new FluxAggregatorMessageHandler());\n\nIntegrationFlowContext.IntegrationFlowRegistration registration =\n this.integrationFlowContext.registration(fluxFlow)\n .register();\n\n@SuppressWarnings("unchecked")\nFlux> window =\n registration.getMessagingTemplate()\n .convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);\n')])])]),a("h4",{attrs:{id:"condition-on-the-message-group"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#condition-on-the-message-group"}},[e._v("#")]),e._v(" Condition on the Message Group")]),e._v(" "),a("p",[e._v("Starting with version 5.5, an "),a("code",[e._v("AbstractCorrelatingMessageHandler")]),e._v(" (including its Java & XML DSLs) exposes a "),a("code",[e._v("groupConditionSupplier")]),e._v(" option of the "),a("code",[e._v("BiFunction, String, String>")]),e._v(" implementation.\nThis function is used on each message added to the group and a result condition sentence is stored into the group for future consideration.\nThe "),a("code",[e._v("ReleaseStrategy")]),e._v(" may consult this condition instead of iterating over all the messages in the group.\nSee "),a("code",[e._v("GroupConditionProvider")]),e._v(" JavaDocs and "),a("RouterLink",{attrs:{to:"/en/spring-integration/message-store.html#message-group-condition"}},[e._v("Message Group Condition")]),e._v(" for more information.")],1),e._v(" "),a("p",[e._v("See also "),a("RouterLink",{attrs:{to:"/en/spring-integration/file.html#file-aggregator"}},[e._v("File Aggregator")]),e._v(".")],1),e._v(" "),a("h3",{attrs:{id:"resequencer"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#resequencer"}},[e._v("#")]),e._v(" Resequencer")]),e._v(" "),a("p",[e._v("The resequencer is related to the aggregator but serves a different purpose.\nWhile the aggregator combines messages, the resequencer passes messages through without changing them.")]),e._v(" "),a("h4",{attrs:{id:"functionality-2"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#functionality-2"}},[e._v("#")]),e._v(" Functionality")]),e._v(" "),a("p",[e._v("The resequencer works in a similar way to the aggregator, in the sense that it uses the "),a("code",[e._v("CORRELATION_ID")]),e._v(" to store messages in groups.\nThe difference is that the Resequencer does not process the messages in any way.\nInstead, it releases them in the order of their "),a("code",[e._v("SEQUENCE_NUMBER")]),e._v(" header values.")]),e._v(" "),a("p",[e._v("With respect to that, you can opt to release all messages at once (after the whole sequence, according to the "),a("code",[e._v("SEQUENCE_SIZE")]),e._v(', and other possibilities) or as soon as a valid sequence is available.\n(We cover what we mean by "a valid sequence" later in this chapter.)')]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("The resequencer is intended to resequence relatively short sequences of messages with small gaps."),a("br"),e._v("If you have a large number of disjoint sequences with many gaps, you may experience performance issues.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h4",{attrs:{id:"configuring-a-resequencer"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#configuring-a-resequencer"}},[e._v("#")]),e._v(" Configuring a Resequencer")]),e._v(" "),a("p",[e._v("See "),a("RouterLink",{attrs:{to:"/en/spring-integration/dsl.html#java-dsl-aggregators"}},[e._v("Aggregators and Resequencers")]),e._v(" for configuring a resequencer in Java DSL.")],1),e._v(" "),a("p",[e._v("Configuring a resequencer requires only including the appropriate element in XML.")]),e._v(" "),a("p",[e._v("The following example shows a resequencer configuration:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n\n\n\n (19)\n expire-group-upon-timeout="false" /> (20)\n')])])]),a("table",[a("thead",[a("tr",[a("th",[a("strong",[e._v("1")])]),e._v(" "),a("th",[e._v("The id of the resequencer is optional.")])])]),e._v(" "),a("tbody",[a("tr",[a("td",[a("strong",[e._v("2")])]),e._v(" "),a("td",[e._v("The input channel of the resequencer."),a("br"),e._v("Required.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("3")])]),e._v(" "),a("td",[e._v("The channel to which the resequencer sends the reordered messages."),a("br"),e._v("Optional.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("4")])]),e._v(" "),a("td",[e._v("The channel to which the resequencer sends the messages that timed out (if "),a("code",[e._v("send-partial-result-on-timeout")]),e._v(" is set to "),a("code",[e._v("false")]),e._v(")."),a("br"),e._v("Optional.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("5")])]),e._v(" "),a("td",[e._v("Whether to send out ordered sequences as soon as they are available or only after the whole message group arrives."),a("br"),e._v("Optional."),a("br"),e._v("(The default is "),a("code",[e._v("false")]),e._v(".)")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("6")])]),e._v(" "),a("td",[e._v("A reference to a "),a("code",[e._v("MessageGroupStore")]),e._v(" that can be used to store groups of messages under their correlation key until they are complete."),a("br"),e._v("Optional."),a("br"),e._v("(The default is a volatile in-memory store.)")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("7")])]),e._v(" "),a("td",[e._v("Whether, upon the expiration of the group, the ordered group should be sent out (even if some of the messages are missing)."),a("br"),e._v("Optional."),a("br"),e._v("(The default is false.)"),a("br"),e._v("See "),a("RouterLink",{attrs:{to:"/en/spring-integration/aggregator.html#reaper"}},[e._v("Managing State in an Aggregator: "),a("code",[e._v("MessageGroupStore")])]),e._v(".")],1)]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("8")])]),e._v(" "),a("td",[e._v("The timeout interval to wait when sending a reply "),a("code",[e._v("Message")]),e._v(" to the "),a("code",[e._v("output-channel")]),e._v(" or "),a("code",[e._v("discard-channel")]),e._v("."),a("br"),e._v("Defaults to "),a("code",[e._v("-1")]),e._v(", which blocks indefinitely."),a("br"),e._v("It is applied only if the output channel has some 'sending' limitations, such as a "),a("code",[e._v("QueueChannel")]),e._v(" with a fixed 'capacity'."),a("br"),e._v("In this case, a "),a("code",[e._v("MessageDeliveryException")]),e._v(" is thrown."),a("br"),e._v("The "),a("code",[e._v("send-timeout")]),e._v(" is ignored for "),a("code",[e._v("AbstractSubscribableChannel")]),e._v(" implementations."),a("br"),e._v("For "),a("code",[e._v("group-timeout(-expression)")]),e._v(", the "),a("code",[e._v("MessageDeliveryException")]),e._v(" from the scheduled expire task leads this task to be rescheduled."),a("br"),e._v("Optional.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("9")])]),e._v(" "),a("td",[e._v("A reference to a bean that implements the message correlation (grouping) algorithm."),a("br"),e._v("The bean can be an implementation of the "),a("code",[e._v("CorrelationStrategy")]),e._v(" interface or a POJO."),a("br"),e._v("In the latter case, the "),a("code",[e._v("correlation-strategy-method")]),e._v(" attribute must also be defined."),a("br"),e._v("Optional."),a("br"),e._v("(By default, the aggregator uses the "),a("code",[e._v("IntegrationMessageHeaderAccessor.CORRELATION_ID")]),e._v(" header.)")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("10")])]),e._v(" "),a("td",[e._v("A method that is defined on the bean referenced by "),a("code",[e._v("correlation-strategy")]),e._v(" and that implements the correlation decision algorithm."),a("br"),e._v("Optional, with restrictions (requires "),a("code",[e._v("correlation-strategy")]),e._v(" to be present).")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("11")])]),e._v(" "),a("td",[e._v("A SpEL expression representing the correlation strategy."),a("br"),e._v("Example: "),a("code",[e._v("\"headers['something']\"")]),e._v("."),a("br"),e._v("Only one of "),a("code",[e._v("correlation-strategy")]),e._v(" or "),a("code",[e._v("correlation-strategy-expression")]),e._v(" is allowed.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("12")])]),e._v(" "),a("td",[e._v("A reference to a bean that implements the release strategy."),a("br"),e._v("The bean can be an implementation of the "),a("code",[e._v("ReleaseStrategy")]),e._v(" interface or a POJO."),a("br"),e._v("In the latter case, the "),a("code",[e._v("release-strategy-method")]),e._v(" attribute must also be defined."),a("br"),e._v("Optional (by default, the aggregator will use the "),a("code",[e._v("IntegrationMessageHeaderAccessor.SEQUENCE_SIZE")]),e._v(" header attribute).")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("13")])]),e._v(" "),a("td",[e._v("A method that is defined on the bean referenced by "),a("code",[e._v("release-strategy")]),e._v(" and that implements the completion decision algorithm."),a("br"),e._v("Optional, with restrictions (requires "),a("code",[e._v("release-strategy")]),e._v(" to be present).")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("14")])]),e._v(" "),a("td",[e._v("A SpEL expression representing the release strategy."),a("br"),e._v("The root object for the expression is a "),a("code",[e._v("MessageGroup")]),e._v("."),a("br"),e._v("Example: "),a("code",[e._v('"size() == 5"')]),e._v("."),a("br"),e._v("Only one of "),a("code",[e._v("release-strategy")]),e._v(" or "),a("code",[e._v("release-strategy-expression")]),e._v(" is allowed.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("15")])]),e._v(" "),a("td",[e._v("Only applies if a "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" is configured for the "),a("code",[e._v("")]),e._v(" "),a("code",[e._v("MessageStore")]),e._v("."),a("br"),e._v("By default, when a "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" is configured to expire partial groups, empty groups are also removed."),a("br"),e._v("Empty groups exist after a group is released normally."),a("br"),e._v("This is to enable the detection and discarding of late-arriving messages."),a("br"),e._v("If you wish to expire empty groups on a longer schedule than expiring partial groups, set this property."),a("br"),e._v("Empty groups are then not removed from the "),a("code",[e._v("MessageStore")]),e._v(" until they have not been modified for at least this number of milliseconds."),a("br"),e._v("Note that the actual time to expire an empty group is also affected by the reaper’s timeout property, and it could be as much as this value plus the timeout.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("16")])]),e._v(" "),a("td",[e._v("See "),a("RouterLink",{attrs:{to:"/en/spring-integration/aggregator.html#aggregator-xml"}},[e._v("Configuring an Aggregator with XML")]),e._v(".")],1)]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("17")])]),e._v(" "),a("td",[e._v("See "),a("RouterLink",{attrs:{to:"/en/spring-integration/aggregator.html#aggregator-xml"}},[e._v("Configuring an Aggregator with XML")]),e._v(".")],1)]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("18")])]),e._v(" "),a("td",[e._v("See "),a("RouterLink",{attrs:{to:"/en/spring-integration/aggregator.html#aggregator-xml"}},[e._v("Configuring an Aggregator with XML")]),e._v(".")],1)]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("19")])]),e._v(" "),a("td",[e._v("See "),a("RouterLink",{attrs:{to:"/en/spring-integration/aggregator.html#aggregator-xml"}},[e._v("Configuring an Aggregator with XML")]),e._v(".")],1)]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("20")])]),e._v(" "),a("td",[e._v("By default, when a group is completed due to a timeout (or by a "),a("code",[e._v("MessageGroupStoreReaper")]),e._v("), the empty group’s metadata is retained."),a("br"),e._v("Late arriving messages are immediately discarded."),a("br"),e._v("Set this to "),a("code",[e._v("true")]),e._v(" to remove the group completely."),a("br"),e._v("Then, late arriving messages start a new group and are not be discarded until the group again times out."),a("br"),e._v("The new group is never released normally because of the “hole” in the sequence range that caused the timeout."),a("br"),e._v("Empty groups can be expired (completely removed) later by using a "),a("code",[e._v("MessageGroupStoreReaper")]),e._v(" together with the "),a("code",[e._v("empty-group-min-timeout")]),e._v(" attribute."),a("br"),e._v("Starting with version 5.0, empty groups are also scheduled for removal after the "),a("code",[e._v("empty-group-min-timeout")]),e._v(" elapses."),a("br"),e._v("The default is 'false'.")])])])]),e._v(" "),a("p",[e._v("Also see "),a("RouterLink",{attrs:{to:"/en/spring-integration/aggregator.html#aggregator-expiring-groups"}},[e._v("Aggregator Expiring Groups")]),e._v(" for more information.")],1),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Since there is no custom behavior to be implemented in Java classes for resequencers, there is no annotation support for it.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h3",{attrs:{id:"message-handler-chain"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#message-handler-chain"}},[e._v("#")]),e._v(" Message Handler Chain")]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("MessageHandlerChain")]),e._v(" is an implementation of "),a("code",[e._v("MessageHandler")]),e._v(" that can be configured as a single message endpoint while actually delegating to a chain of other handlers, such as filters, transformers, splitters, and so on.\nWhen several handlers need to be connected in a fixed, linear progression, this can lead to a much simpler configuration.\nFor example, it is fairly common to provide a transformer before other components.\nSimilarly, when you provide a filter before some other component in a chain, you essentially create a "),a("a",{attrs:{href:"https://www.enterpriseintegrationpatterns.com/MessageSelector.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("selective consumer"),a("OutboundLink")],1),e._v(".\nIn either case, the chain requires only a single "),a("code",[e._v("input-channel")]),e._v(" and a single "),a("code",[e._v("output-channel")]),e._v(", eliminating the need to define channels for each individual component.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("The "),a("code",[e._v("MessageHandlerChain")]),e._v(" is mostly designed for an XML configuration."),a("br"),e._v("For Java DSL, an "),a("code",[e._v("IntegrationFlow")]),e._v(" definition can be treated as a chain component, but it has nothing to do with concepts and principles described in this chapter below."),a("br"),e._v("See "),a("RouterLink",{attrs:{to:"/en/spring-integration/dsl.html#java-dsl"}},[e._v("Java DSL")]),e._v(" for more information.")],1)])]),e._v(" "),a("tbody")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Spring Integration’s "),a("code",[e._v("Filter")]),e._v(" provides a boolean property: "),a("code",[e._v("throwExceptionOnRejection")]),e._v("."),a("br"),e._v("When you provide multiple selective consumers on the same point-to-point channel with different acceptance criteria, you should set this value 'true' (the default is "),a("code",[e._v("false")]),e._v(") so that the dispatcher knows that the message was rejected and, as a result, tries to pass the message on to other subscribers."),a("br"),e._v("If the exception were not thrown, it would appear to the dispatcher that the message had been passed on successfully even though the filter had dropped the message to prevent further processing."),a("br"),e._v("If you do indeed want to “drop” the messages, the filter’s 'discard-channel' might be useful, since it does give you a chance to perform some operation with the dropped message (such as sending it to a JMS queue or writing it to a log).")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("The handler chain simplifies configuration while internally maintaining the same degree of loose coupling between components, and it is trivial to modify the configuration if at some point a non-linear arrangement is required.")]),e._v(" "),a("p",[e._v("Internally, the chain is expanded into a linear setup of the listed endpoints, separated by anonymous channels.\nThe reply channel header is not taken into account within the chain.\nOnly after the last handler is invoked is the resulting message forwarded to the reply channel or the chain’s output channel.\nBecause of this setup, all handlers except the last must implement the "),a("code",[e._v("MessageProducer")]),e._v(" interface (which provides a 'setOutputChannel()' method).\nIf the "),a("code",[e._v("outputChannel")]),e._v(" on the "),a("code",[e._v("MessageHandlerChain")]),e._v(" is set, the last handler needs only an output channel.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("As with other endpoints, the "),a("code",[e._v("output-channel")]),e._v(" is optional."),a("br"),e._v("If there is a reply message at the end of the chain, the output-channel takes precedence."),a("br"),e._v("However, if it is not available, the chain handler checks for a reply channel header on the inbound message as a fallback.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("In most cases, you need not implement "),a("code",[e._v("MessageHandler")]),e._v(" yourself.\nThe next section focuses on namespace support for the chain element.\nMost Spring Integration endpoints, such as service activators and transformers, are suitable for use within a "),a("code",[e._v("MessageHandlerChain")]),e._v(".")]),e._v(" "),a("h4",{attrs:{id:"configuring-a-chain"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#configuring-a-chain"}},[e._v("#")]),e._v(" Configuring a Chain")]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("")]),e._v(" element provides an "),a("code",[e._v("input-channel")]),e._v(" attribute.\nIf the last element in the chain is capable of producing reply messages (optional), it also supports an "),a("code",[e._v("output-channel")]),e._v(" attribute.\nThe sub-elements are then filters, transformers, splitters, and service-activators.\nThe last element may also be a router or an outbound channel adapter.\nThe following example shows a chain definition:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n \n \n \n \n\n')])])]),a("p",[e._v("The "),a("code",[e._v("")]),e._v(" element used in the preceding example sets a message header named "),a("code",[e._v("thing1")]),e._v(" with a value of "),a("code",[e._v("thing2")]),e._v(" on the message.\nA header enricher is a specialization of "),a("code",[e._v("Transformer")]),e._v(" that touches only header values.\nYou could obtain the same result by implementing a "),a("code",[e._v("MessageHandler")]),e._v(" that did the header modifications and wiring that as a bean, but the header-enricher is a simpler option.")]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("")]),e._v(" can be configured as the last “closed-box” consumer of the message flow.\nFor this solution, you can to put it at the end of the some , as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n \n \n \n \n \n\n')])])]),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Disallowed Attributes and Elements"),a("br"),a("br"),e._v("Certain attributes, such as "),a("code",[e._v("order")]),e._v(" and "),a("code",[e._v("input-channel")]),e._v(" are not allowed to be specified on components used within a chain."),a("br"),e._v("The same is true for the poller sub-element."),a("br"),a("br"),e._v("For the Spring Integration core components, the XML schema itself enforces some of these constraints."),a("br"),e._v("However, for non-core components or your own custom components, these constraints are enforced by the XML namespace parser, not by the XML schema."),a("br"),a("br"),e._v("These XML namespace parser constraints were added with Spring Integration 2.2."),a("br"),e._v("If you try to use disallowed attributes and elements, the XML namespace parser throws a "),a("code",[e._v("BeanDefinitionParsingException")]),e._v(".")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h4",{attrs:{id:"using-the-id-attribute"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#using-the-id-attribute"}},[e._v("#")]),e._v(" Using the 'id' Attribute")]),e._v(" "),a("p",[e._v("Beginning with Spring Integration 3.0, if a chain element is given an "),a("code",[e._v("id")]),e._v(" attribute, the bean name for the element is a combination of the chain’s "),a("code",[e._v("id")]),e._v(" and the "),a("code",[e._v("id")]),e._v(" of the element itself.\nElements without "),a("code",[e._v("id")]),e._v(" attributes are not registered as beans, but each one is given a "),a("code",[e._v("componentName")]),e._v(" that includes the chain "),a("code",[e._v("id")]),e._v(".\nConsider the following example:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n \n\n')])])]),a("p",[e._v("In the preceding example:")]),e._v(" "),a("ul",[a("li",[a("p",[e._v("The "),a("code",[e._v("")]),e._v(" root element has an "),a("code",[e._v("id")]),e._v(" of 'somethingChain'.\nConsequently, the "),a("code",[e._v("AbstractEndpoint")]),e._v(" implementation ("),a("code",[e._v("PollingConsumer")]),e._v(" or "),a("code",[e._v("EventDrivenConsumer")]),e._v(", depending on the "),a("code",[e._v("input-channel")]),e._v(" type) bean takes this value as its bean name.")])]),e._v(" "),a("li",[a("p",[e._v("The "),a("code",[e._v("MessageHandlerChain")]),e._v(" bean acquires a bean alias ('somethingChain.handler'), which allows direct access to this bean from the "),a("code",[e._v("BeanFactory")]),e._v(".")])]),e._v(" "),a("li",[a("p",[e._v("The "),a("code",[e._v("")]),e._v(" is not a fully fledged messaging endpoint (it is not a "),a("code",[e._v("PollingConsumer")]),e._v(" or "),a("code",[e._v("EventDrivenConsumer")]),e._v(").\nIt is a "),a("code",[e._v("MessageHandler")]),e._v(" within the "),a("code",[e._v("")]),e._v(".\nIn this case, the bean name registered with the "),a("code",[e._v("BeanFactory")]),e._v(" is 'somethingChain$child.somethingService.handler'.")])]),e._v(" "),a("li",[a("p",[e._v("The "),a("code",[e._v("componentName")]),e._v(" of this "),a("code",[e._v("ServiceActivatingHandler")]),e._v(" takes the same value but without the '.handler' suffix.\nIt becomes 'somethingChain$child.somethingService'.")])]),e._v(" "),a("li",[a("p",[e._v("The last "),a("code",[e._v("")]),e._v(" sub-component, "),a("code",[e._v("")]),e._v(", does not have an "),a("code",[e._v("id")]),e._v(" attribute.\nIts "),a("code",[e._v("componentName")]),e._v(" is based on its position in the "),a("code",[e._v("")]),e._v(".\nIn this case, it is 'somethingChain$child#1'.\n(The final element of the name is the order within the chain, beginning with '#0').\nNote, this transformer is not registered as a bean within the application context, so it does not get a "),a("code",[e._v("beanName")]),e._v(".\nHowever its "),a("code",[e._v("componentName")]),e._v(" has a value that is useful for logging and other purposes.")])])]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("id")]),e._v(" attribute for "),a("code",[e._v("")]),e._v(" elements lets them be eligible for "),a("RouterLink",{attrs:{to:"/en/spring-integration/jmx.html#jmx-mbean-exporter"}},[e._v("JMX export")]),e._v(", and they are trackable in the "),a("RouterLink",{attrs:{to:"/en/spring-integration/message-history.html#message-history"}},[e._v("message history")]),e._v(".\nYou can access them from the "),a("code",[e._v("BeanFactory")]),e._v(" by using the appropriate bean name, as discussed earlier.")],1),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("It is useful to provide an explicit "),a("code",[e._v("id")]),e._v(" attribute on "),a("code",[e._v("")]),e._v(" elements to simplify the identification of sub-components in logs and to provide access to them from the "),a("code",[e._v("BeanFactory")]),e._v(" etc.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h4",{attrs:{id:"calling-a-chain-from-within-a-chain"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#calling-a-chain-from-within-a-chain"}},[e._v("#")]),e._v(" Calling a Chain from within a Chain")]),e._v(" "),a("p",[e._v("Sometimes, you need to make a nested call to another chain from within a chain and then come back and continue execution within the original chain.\nTo accomplish this, you can use a messaging gateway by including a element, as the following example shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n \n \n \n \n \n \n\n\n\n \n \n \n \n \n \n \n\n\n\n \n \n \n \n \n \n\n')])])]),a("p",[e._v("In the preceding example, "),a("code",[e._v("nested-chain-a")]),e._v(" is called at the end of "),a("code",[e._v("main-chain")]),e._v(" processing by the 'gateway' element configured there.\nWhile in "),a("code",[e._v("nested-chain-a")]),e._v(", a call to a "),a("code",[e._v("nested-chain-b")]),e._v(" is made after header enrichment.\nThen the flow comes back to finish execution in "),a("code",[e._v("nested-chain-b")]),e._v(".\nFinally, the flow returns to "),a("code",[e._v("main-chain")]),e._v(".\nWhen the nested version of a "),a("code",[e._v("")]),e._v(" element is defined in the chain, it does not require the "),a("code",[e._v("service-interface")]),e._v(" attribute.\nInstead, it takes the message in its current state and places it on the channel defined in the "),a("code",[e._v("request-channel")]),e._v(" attribute.\nWhen the downstream flow initiated by that gateway completes, a "),a("code",[e._v("Message")]),e._v(" is returned to the gateway and continues its journey within the current chain.")]),e._v(" "),a("h3",{attrs:{id:"scatter-gather"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#scatter-gather"}},[e._v("#")]),e._v(" Scatter-Gather")]),e._v(" "),a("p",[e._v("Starting with version 4.1, Spring Integration provides an implementation of the "),a("a",{attrs:{href:"https://www.enterpriseintegrationpatterns.com/BroadcastAggregate.html",target:"_blank",rel:"noopener noreferrer"}},[e._v("scatter-gather"),a("OutboundLink")],1),e._v(" enterprise integration pattern.\nIt is a compound endpoint for which the goal is to send a message to the recipients and aggregate the results.\nAs noted in "),a("a",{attrs:{href:"https://www.enterpriseintegrationpatterns.com/",target:"_blank",rel:"noopener noreferrer"}},[a("em",[e._v("Enterprise Integration Patterns")]),a("OutboundLink")],1),e._v(", it is a component for scenarios such as “best quote”, where we need to request information from several suppliers and decide which one provides us with the best term for the requested item.")]),e._v(" "),a("p",[e._v("Previously, the pattern could be configured by using discrete components.\nThis enhancement brings more convenient configuration.")]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("ScatterGatherHandler")]),e._v(" is a request-reply endpoint that combines a "),a("code",[e._v("PublishSubscribeChannel")]),e._v(" (or a "),a("code",[e._v("RecipientListRouter")]),e._v(") and an "),a("code",[e._v("AggregatingMessageHandler")]),e._v(".\nThe request message is sent to the "),a("code",[e._v("scatter")]),e._v(" channel, and the "),a("code",[e._v("ScatterGatherHandler")]),e._v(" waits for the reply that the aggregator sends to the "),a("code",[e._v("outputChannel")]),e._v(".")]),e._v(" "),a("h4",{attrs:{id:"functionality-3"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#functionality-3"}},[e._v("#")]),e._v(" Functionality")]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("Scatter-Gather")]),e._v(" pattern suggests two scenarios: “auction” and “distribution”.\nIn both cases, the "),a("code",[e._v("aggregation")]),e._v(" function is the same and provides all the options available for the "),a("code",[e._v("AggregatingMessageHandler")]),e._v(".\n(Actually, the "),a("code",[e._v("ScatterGatherHandler")]),e._v(" requires only an "),a("code",[e._v("AggregatingMessageHandler")]),e._v(" as a constructor argument.)\nSee "),a("RouterLink",{attrs:{to:"/en/spring-integration/aggregator.html#aggregator"}},[e._v("Aggregator")]),e._v(" for more information.")],1),e._v(" "),a("h5",{attrs:{id:"auction"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#auction"}},[e._v("#")]),e._v(" Auction")]),e._v(" "),a("p",[e._v("The auction "),a("code",[e._v("Scatter-Gather")]),e._v(" variant uses “publish-subscribe” logic for the request message, where the “scatter” channel is a "),a("code",[e._v("PublishSubscribeChannel")]),e._v(" with "),a("code",[e._v('apply-sequence="true"')]),e._v(".\nHowever, this channel can be any "),a("code",[e._v("MessageChannel")]),e._v(" implementation (as is the case with the "),a("code",[e._v("request-channel")]),e._v(" in the "),a("code",[e._v("ContentEnricher")]),e._v(" — see "),a("RouterLink",{attrs:{to:"/en/spring-integration/content-enrichment.html#content-enricher"}},[e._v("Content Enricher")]),e._v(").\nHowever, in this case, you should create your own custom "),a("code",[e._v("correlationStrategy")]),e._v(" for the "),a("code",[e._v("aggregation")]),e._v(" function.")],1),e._v(" "),a("h5",{attrs:{id:"distribution"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#distribution"}},[e._v("#")]),e._v(" Distribution")]),e._v(" "),a("p",[e._v("The distribution "),a("code",[e._v("Scatter-Gather")]),e._v(" variant is based on the "),a("code",[e._v("RecipientListRouter")]),e._v(" (see "),a("RouterLink",{attrs:{to:"/en/spring-integration/router.html#router-implementations-recipientlistrouter"}},[a("code",[e._v("RecipientListRouter")])]),e._v(") with all available options for the "),a("code",[e._v("RecipientListRouter")]),e._v(".\nThis is the second "),a("code",[e._v("ScatterGatherHandler")]),e._v(" constructor argument.\nIf you want to rely on only the default "),a("code",[e._v("correlationStrategy")]),e._v(" for the "),a("code",[e._v("recipient-list-router")]),e._v(" and the "),a("code",[e._v("aggregator")]),e._v(", you should specify "),a("code",[e._v('apply-sequence="true"')]),e._v(".\nOtherwise, you should supply a custom "),a("code",[e._v("correlationStrategy")]),e._v(" for the "),a("code",[e._v("aggregator")]),e._v(".\nUnlike the "),a("code",[e._v("PublishSubscribeChannel")]),e._v(" variant (the auction variant), having a "),a("code",[e._v("recipient-list-router")]),e._v(" "),a("code",[e._v("selector")]),e._v(" option lets filter target suppliers based on the message.\nWith "),a("code",[e._v('apply-sequence="true"')]),e._v(", the default "),a("code",[e._v("sequenceSize")]),e._v(" is supplied, and the "),a("code",[e._v("aggregator")]),e._v(" can release the group correctly.\nThe distribution option is mutually exclusive with the auction option.")],1),e._v(" "),a("p",[e._v("For both the auction and the distribution variants, the request (scatter) message is enriched with the "),a("code",[e._v("gatherResultChannel")]),e._v(" header to wait for a reply message from the "),a("code",[e._v("aggregator")]),e._v(".")]),e._v(" "),a("p",[e._v("By default, all suppliers should send their result to the "),a("code",[e._v("replyChannel")]),e._v(" header (usually by omitting the "),a("code",[e._v("output-channel")]),e._v(" from the ultimate endpoint).\nHowever, the "),a("code",[e._v("gatherChannel")]),e._v(" option is also provided, letting suppliers send their reply to that channel for the aggregation.")]),e._v(" "),a("h4",{attrs:{id:"configuring-a-scatter-gather-endpoint"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#configuring-a-scatter-gather-endpoint"}},[e._v("#")]),e._v(" Configuring a Scatter-Gather Endpoint")]),e._v(" "),a("p",[e._v("The following example shows Java configuration for the bean definition for "),a("code",[e._v("Scatter-Gather")]),e._v(":")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Bean\npublic MessageHandler distributor() {\n RecipientListRouter router = new RecipientListRouter();\n router.setApplySequence(true);\n router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),\n distributionChannel3()));\n return router;\n}\n\n@Bean\npublic MessageHandler gatherer() {\n\treturn new AggregatingMessageHandler(\n\t\t\tnew ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),\n\t\t\tnew SimpleMessageStore(),\n\t\t\tnew HeaderAttributeCorrelationStrategy(\n\t\t\t IntegrationMessageHeaderAccessor.CORRELATION_ID),\n\t\t\tnew ExpressionEvaluatingReleaseStrategy("size() == 2"));\n}\n\n@Bean\n@ServiceActivator(inputChannel = "distributionChannel")\npublic MessageHandler scatterGatherDistribution() {\n\tScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());\n\thandler.setOutputChannel(output());\n\treturn handler;\n}\n')])])]),a("p",[e._v("In the preceding example, we configure the "),a("code",[e._v("RecipientListRouter")]),e._v(" "),a("code",[e._v("distributor")]),e._v(" bean with "),a("code",[e._v('applySequence="true"')]),e._v(" and the list of recipient channels.\nThe next bean is for an "),a("code",[e._v("AggregatingMessageHandler")]),e._v(".\nFinally, we inject both those beans into the "),a("code",[e._v("ScatterGatherHandler")]),e._v(" bean definition and mark it as a "),a("code",[e._v("@ServiceActivator")]),e._v(" to wire the scatter-gather component into the integration flow.")]),e._v(" "),a("p",[e._v("The following example shows how to configure the "),a("code",[e._v("")]),e._v(" endpoint by using the XML namespace:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v(' (11)\n\t\t\t (12)\n\t\t\t (13)\n\n')])])]),a("table",[a("thead",[a("tr",[a("th",[a("strong",[e._v("1")])]),e._v(" "),a("th",[e._v("The id of the endpoint."),a("br"),e._v("The "),a("code",[e._v("ScatterGatherHandler")]),e._v(" bean is registered with an alias of "),a("code",[e._v("id + '.handler'")]),e._v("."),a("br"),e._v("The "),a("code",[e._v("RecipientListRouter")]),e._v(" bean is registered with an alias of "),a("code",[e._v("id + '.scatterer'")]),e._v("."),a("br"),e._v("The "),a("code",[e._v("AggregatingMessageHandler")]),e._v("bean is registered with an alias of "),a("code",[e._v("id + '.gatherer'")]),e._v("."),a("br"),e._v("Optional."),a("br"),e._v("(The "),a("code",[e._v("BeanFactory")]),e._v(" generates a default "),a("code",[e._v("id")]),e._v(" value.)")])])]),e._v(" "),a("tbody",[a("tr",[a("td",[a("strong",[e._v("2")])]),e._v(" "),a("td",[e._v("Lifecycle attribute signaling whether the endpoint should be started during application context initialization."),a("br"),e._v("In addition, the "),a("code",[e._v("ScatterGatherHandler")]),e._v(" also implements "),a("code",[e._v("Lifecycle")]),e._v(" and starts and stops "),a("code",[e._v("gatherEndpoint")]),e._v(", which is created internally if a "),a("code",[e._v("gather-channel")]),e._v(" is provided."),a("br"),e._v("Optional."),a("br"),e._v("(The default is "),a("code",[e._v("true")]),e._v(".)")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("3")])]),e._v(" "),a("td",[e._v("The channel on which to receive request messages to handle them in the "),a("code",[e._v("ScatterGatherHandler")]),e._v("."),a("br"),e._v("Required.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("4")])]),e._v(" "),a("td",[e._v("The channel to which the "),a("code",[e._v("ScatterGatherHandler")]),e._v(" sends the aggregation results."),a("br"),e._v("Optional."),a("br"),e._v("(Incoming messages can specify a reply channel themselves in the "),a("code",[e._v("replyChannel")]),e._v(" message header).")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("5")])]),e._v(" "),a("td",[e._v("The channel to which to send the scatter message for the auction scenario."),a("br"),e._v("Optional."),a("br"),e._v("Mutually exclusive with the "),a("code",[e._v("")]),e._v(" sub-element.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("6")])]),e._v(" "),a("td",[e._v("The channel on which to receive replies from each supplier for the aggregation."),a("br"),e._v("It is used as the "),a("code",[e._v("replyChannel")]),e._v(" header in the scatter message."),a("br"),e._v("Optional."),a("br"),e._v("By default, the "),a("code",[e._v("FixedSubscriberChannel")]),e._v(" is created.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("7")])]),e._v(" "),a("td",[e._v("The order of this component when more than one handler is subscribed to the same "),a("code",[e._v("DirectChannel")]),e._v(" (use for load balancing purposes)."),a("br"),e._v("Optional.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("8")])]),e._v(" "),a("td",[e._v("Specifies the phase in which the endpoint should be started and stopped."),a("br"),e._v("The startup order proceeds from lowest to highest, and the shutdown order is from highest to lowest."),a("br"),e._v("By default, this value is "),a("code",[e._v("Integer.MAX_VALUE")]),e._v(", meaning that this container starts as late as possible and stops as soon as possible."),a("br"),e._v("Optional.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("9")])]),e._v(" "),a("td",[e._v("The timeout interval to wait when sending a reply "),a("code",[e._v("Message")]),e._v(" to the "),a("code",[e._v("output-channel")]),e._v("."),a("br"),e._v("By default, the send blocks for one second."),a("br"),e._v("It applies only if the output channel has some 'sending' limitations — for example, a "),a("code",[e._v("QueueChannel")]),e._v(" with a fixed 'capacity' that is full."),a("br"),e._v("In this case, a "),a("code",[e._v("MessageDeliveryException")]),e._v(" is thrown."),a("br"),e._v("The "),a("code",[e._v("send-timeout")]),e._v(" is ignored for "),a("code",[e._v("AbstractSubscribableChannel")]),e._v(" implementations."),a("br"),e._v("For "),a("code",[e._v("group-timeout(-expression)")]),e._v(", the "),a("code",[e._v("MessageDeliveryException")]),e._v(" from the scheduled expire task leads this task to be rescheduled."),a("br"),e._v("Optional.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("10")])]),e._v(" "),a("td",[e._v("Lets you specify how long the scatter-gather waits for the reply message before returning."),a("br"),e._v("By default, it waits indefinitely."),a("br"),e._v("'null' is returned if the reply times out."),a("br"),e._v("Optional."),a("br"),e._v("It defaults to "),a("code",[e._v("-1")]),e._v(", meaning to wait indefinitely.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("11")])]),e._v(" "),a("td",[e._v("Specifies whether the scatter-gather must return a non-null value."),a("br"),e._v("This value is "),a("code",[e._v("true")]),e._v(" by default."),a("br"),e._v("Consequently, a "),a("code",[e._v("ReplyRequiredException")]),e._v(" is thrown when the underlying aggregator returns a null value after "),a("code",[e._v("gather-timeout")]),e._v("."),a("br"),e._v("Note, if "),a("code",[e._v("null")]),e._v(" is a possibility, the "),a("code",[e._v("gather-timeout")]),e._v(" should be specified to avoid an indefinite wait.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("12")])]),e._v(" "),a("td",[e._v("The "),a("code",[e._v("")]),e._v(" options."),a("br"),e._v("Optional."),a("br"),e._v("Mutually exclusive with "),a("code",[e._v("scatter-channel")]),e._v(" attribute.")])]),e._v(" "),a("tr",[a("td",[a("strong",[e._v("13")])]),e._v(" "),a("td",[e._v("The "),a("code",[e._v("")]),e._v(" options."),a("br"),e._v("Required.")])])])]),e._v(" "),a("h4",{attrs:{id:"error-handling"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#error-handling"}},[e._v("#")]),e._v(" Error Handling")]),e._v(" "),a("p",[e._v("Since Scatter-Gather is a multi request-reply component, error handling has some extra complexity.\nIn some cases, it is better to just catch and ignore downstream exceptions if the "),a("code",[e._v("ReleaseStrategy")]),e._v(" allows the process to finish with fewer replies than requests.\nIn other cases something like a “compensation message” should be considered for returning from sub-flow, when an error happens.")]),e._v(" "),a("p",[e._v("Every async sub-flow should be configured with a "),a("code",[e._v("errorChannel")]),e._v(" header for the proper error message sending from the "),a("code",[e._v("MessagePublishingErrorHandler")]),e._v(".\nOtherwise, an error will be sent to the global "),a("code",[e._v("errorChannel")]),e._v(" with the common error handling logic.\nSee "),a("RouterLink",{attrs:{to:"/en/spring-integration/error-handling.html#error-handling"}},[e._v("Error Handling")]),e._v(" for more information about async error processing.")],1),e._v(" "),a("p",[e._v("Synchronous flows may use an "),a("code",[e._v("ExpressionEvaluatingRequestHandlerAdvice")]),e._v(" for ignoring the exception or returning a compensation message.\nWhen an exception is thrown from one of the sub-flows to the "),a("code",[e._v("ScatterGatherHandler")]),e._v(", it is just re-thrown to upstream.\nThis way all other sub-flows will work for nothing and their replies are going to be ignored in the "),a("code",[e._v("ScatterGatherHandler")]),e._v(".\nThis might be an expected behavior sometimes, but in most cases it would be better to handle the error in the particular sub-flow without impacting all others and the expectations in the gatherer.")]),e._v(" "),a("p",[e._v("Starting with version 5.1.3, the "),a("code",[e._v("ScatterGatherHandler")]),e._v(" is supplied with the "),a("code",[e._v("errorChannelName")]),e._v(" option.\nIt is populated to the "),a("code",[e._v("errorChannel")]),e._v(" header of the scatter message and is used in the when async error happens or can be used in the regular synchronous sub-flow for directly sending an error message.")]),e._v(" "),a("p",[e._v("The sample configuration below demonstrates async error handling by returning a compensation message:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@Bean\npublic IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {\n return f -> f\n .scatterGather(\n scatterer -> scatterer\n .applySequence(true)\n .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))\n .recipientFlow(f2 -> f2\n .channel(c -> c.executor(taskExecutor))\n .transform(p -> {\n throw new RuntimeException("Sub-flow#2");\n })),\n null,\n s -> s.errorChannel("scatterGatherErrorChannel"));\n}\n\n@ServiceActivator(inputChannel = "scatterGatherErrorChannel")\npublic Message processAsyncScatterError(MessagingException payload) {\n return MessageBuilder.withPayload(payload.getCause().getCause())\n .copyHeaders(payload.getFailedMessage().getHeaders())\n .build();\n}\n')])])]),a("p",[e._v("To produce a proper reply, we have to copy headers (including "),a("code",[e._v("replyChannel")]),e._v(" and "),a("code",[e._v("errorChannel")]),e._v(") from the "),a("code",[e._v("failedMessage")]),e._v(" of the "),a("code",[e._v("MessagingException")]),e._v(" that has been sent to the "),a("code",[e._v("scatterGatherErrorChannel")]),e._v(" by the "),a("code",[e._v("MessagePublishingErrorHandler")]),e._v(".\nThis way the target exception is returned to the gatherer of the "),a("code",[e._v("ScatterGatherHandler")]),e._v(" for reply messages group completion.\nSuch an exception "),a("code",[e._v("payload")]),e._v(" can be filtered out in the "),a("code",[e._v("MessageGroupProcessor")]),e._v(" of the gatherer or processed other way downstream, after the scatter-gather endpoint.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Before sending scattering results to the gatherer, "),a("code",[e._v("ScatterGatherHandler")]),e._v(" reinstates the request message headers, including reply and error channels if any."),a("br"),e._v("This way errors from the "),a("code",[e._v("AggregatingMessageHandler")]),e._v(" are going to be propagated to the caller, even if an async hand off is applied in scatter recipient subflows."),a("br"),e._v("For successful operation, a "),a("code",[e._v("gatherResultChannel")]),e._v(", "),a("code",[e._v("originalReplyChannel")]),e._v(" and "),a("code",[e._v("originalErrorChannel")]),e._v(" headers must be transferred back to replies from scatter recipient subflows."),a("br"),e._v("In this case a reasonable, finite "),a("code",[e._v("gatherTimeout")]),e._v(" must be configured for the "),a("code",[e._v("ScatterGatherHandler")]),e._v("."),a("br"),e._v("Otherwise it is going to be blocked waiting for a reply from the gatherer forever, by default.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("h3",{attrs:{id:"thread-barrier"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#thread-barrier"}},[e._v("#")]),e._v(" Thread Barrier")]),e._v(" "),a("p",[e._v("Sometimes, we need to suspend a message flow thread until some other asynchronous event occurs.\nFor example, consider an HTTP request that publishes a message to RabbitMQ.\nWe might wish to not reply to the user until the RabbitMQ broker has issued an acknowledgment that the message was received.")]),e._v(" "),a("p",[e._v("In version 4.2, Spring Integration introduced the "),a("code",[e._v("")]),e._v(" component for this purpose.\nThe underlying "),a("code",[e._v("MessageHandler")]),e._v(" is the "),a("code",[e._v("BarrierMessageHandler")]),e._v(".\nThis class also implements "),a("code",[e._v("MessageTriggerAction")]),e._v(", in which a message passed to the "),a("code",[e._v("trigger()")]),e._v(" method releases a corresponding thread in the "),a("code",[e._v("handleRequestMessage()")]),e._v(" method (if present).")]),e._v(" "),a("p",[e._v("The suspended thread and trigger thread are correlated by invoking a "),a("code",[e._v("CorrelationStrategy")]),e._v(" on the messages.\nWhen a message is sent to the "),a("code",[e._v("input-channel")]),e._v(", the thread is suspended for up to "),a("code",[e._v("requestTimeout")]),e._v(" milliseconds, waiting for a corresponding trigger message.\nThe default correlation strategy uses the "),a("code",[e._v("IntegrationMessageHeaderAccessor.CORRELATION_ID")]),e._v(" header.\nWhen a trigger message arrives with the same correlation, the thread is released.\nThe message sent to the "),a("code",[e._v("output-channel")]),e._v(" after release is constructed by using a "),a("code",[e._v("MessageGroupProcessor")]),e._v(".\nBy default, the message is a "),a("code",[e._v("Collection")]),e._v(" of the two payloads, and the headers are merged by using a "),a("code",[e._v("DefaultAggregatingMessageGroupProcessor")]),e._v(".")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("If the "),a("code",[e._v("trigger()")]),e._v(" method is invoked first (or after the main thread times out), it is suspended for up to "),a("code",[e._v("triggerTimeout")]),e._v(" waiting for the suspending message to arrive."),a("br"),e._v("If you do not want to suspend the trigger thread, consider handing off to a "),a("code",[e._v("TaskExecutor")]),e._v(" instead so that its thread is suspended instead.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Prior version 5.4, there was only one "),a("code",[e._v("timeout")]),e._v(" option for both request and trigger messages, but in some use-case it is better to have different timeouts for those actions."),a("br"),e._v("Therefore "),a("code",[e._v("requestTimeout")]),e._v(" and "),a("code",[e._v("triggerTimeout")]),e._v(" options have been introduced.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("The "),a("code",[e._v("requires-reply")]),e._v(" property determines the action to take if the suspended thread times out before the trigger message arrives.\nBy default, it is "),a("code",[e._v("false")]),e._v(", which means the endpoint returns "),a("code",[e._v("null")]),e._v(", the flow ends, and the thread returns to the caller.\nWhen "),a("code",[e._v("true")]),e._v(", a "),a("code",[e._v("ReplyRequiredException")]),e._v(" is thrown.")]),e._v(" "),a("p",[e._v("You can call the "),a("code",[e._v("trigger()")]),e._v(" method programmatically (obtain the bean reference by using the name, "),a("code",[e._v("barrier.handler")]),e._v(" — where "),a("code",[e._v("barrier")]),e._v(" is the bean name of the barrier endpoint).\nAlternatively, you can configure an "),a("code",[e._v("")]),e._v(" to trigger the release.")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("Only one thread can be suspended with the same correlation."),a("br"),e._v("The same correlation can be used multiple times but only once concurrently."),a("br"),e._v("An exception is thrown if a second thread arrives with the same correlation.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("The following example shows how to use a custom header for correlation:")]),e._v(" "),a("p",[e._v("Java")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('@ServiceActivator(inputChannel="in")\n@Bean\npublic BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {\n BarrierMessageHandler barrier = new BarrierMessageHandler(10000);\n barrier.setOutputChannel(out());\n barrier.setDiscardChannel(lateTriggerChannel);\n return barrier;\n}\n\n@ServiceActivator (inputChannel="release")\n@Bean\npublic MessageHandler releaser(MessageTriggerAction barrier) {\n return barrier::trigger(message);\n}\n')])])]),a("p",[e._v("XML")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n\n\n\n')])])]),a("p",[e._v("Depending on which one has a message arrive first, either the thread sending a message to "),a("code",[e._v("in")]),e._v(" or the thread sending a message to "),a("code",[e._v("release")]),e._v(" waits for up to ten seconds until the other message arrives.\nWhen the message is released, the "),a("code",[e._v("out")]),e._v(" channel is sent a message that combines the result of invoking the custom "),a("code",[e._v("MessageGroupProcessor")]),e._v(" bean, named "),a("code",[e._v("myOutputProcessor")]),e._v(".\nIf the main thread times out and a trigger arrives later, you can configure a discard channel to which the late trigger is sent.")]),e._v(" "),a("p",[e._v("For an example of this component, see the "),a("a",{attrs:{href:"https://github.com/spring-projects/spring-integration-samples/tree/main/basic/barrier",target:"_blank",rel:"noopener noreferrer"}},[e._v("barrier sample application"),a("OutboundLink")],1),e._v(".")])])}),[],!1,null,null,null);t.default=r.exports}}]);