(window.webpackJsonp=window.webpackJsonp||[]).push([[147],{571:function(e,t,a){"use strict";a.r(t);var r=a(56),n=Object(r.a)({},(function(){var e=this,t=e.$createElement,a=e._self._c||t;return a("ContentSlotsDistributor",{attrs:{"slot-key":e.$parent.slotKey}},[a("h1",{attrs:{id:"stream-support"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#stream-support"}},[e._v("#")]),e._v(" Stream Support")]),e._v(" "),a("h2",{attrs:{id:"stream-support-2"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#stream-support-2"}},[e._v("#")]),e._v(" Stream Support")]),e._v(" "),a("p",[e._v("In many cases, application data is obtained from a stream.\nIt is not recommended to send a reference to a stream as a message payload to a consumer.\nInstead, messages are created from data that is read from an input stream, and message payloads are written to an output stream one by one.")]),e._v(" "),a("p",[e._v("You need to include this dependency into your project:")]),e._v(" "),a("p",[e._v("Maven")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("\n org.springframework.integration\n spring-integration-stream\n 5.5.9\n\n")])])]),a("p",[e._v("Gradle")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('compile "org.springframework.integration:spring-integration-stream:5.5.9"\n')])])]),a("h3",{attrs:{id:"reading-from-streams"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#reading-from-streams"}},[e._v("#")]),e._v(" Reading from Streams")]),e._v(" "),a("p",[e._v("Spring Integration provides two adapters for streams.\nBoth "),a("code",[e._v("ByteStreamReadingMessageSource")]),e._v(" and "),a("code",[e._v("CharacterStreamReadingMessageSource")]),e._v(" implement "),a("code",[e._v("MessageSource")]),e._v(".\nBy configuring one of these within a channel-adapter element, the polling period can be configured and the message bus can automatically detect and schedule them.\nThe byte stream version requires an "),a("code",[e._v("InputStream")]),e._v(", and the character stream version requires a "),a("code",[e._v("Reader")]),e._v(" as the single constructor argument.\nThe "),a("code",[e._v("ByteStreamReadingMessageSource")]),e._v(" also accepts the 'bytesPerMessage' property to determine how many bytes it tries to read into each "),a("code",[e._v("Message")]),e._v(".\nThe default value is 1024.\nThe following example creates an input stream that creates messages that each contain 2048 bytes:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n \n\n\n\n \n\n')])])]),a("p",[e._v("The "),a("code",[e._v("CharacterStreamReadingMessageSource")]),e._v(" wraps the reader in a "),a("code",[e._v("BufferedReader")]),e._v(" (if it is not one already).\nYou can set the buffer size used by the buffered reader in the second constructor argument.\nStarting with version 5.0, a third constructor argument ("),a("code",[e._v("blockToDetectEOF")]),e._v(") controls the behavior of the "),a("code",[e._v("CharacterStreamReadingMessageSource")]),e._v(".\nWhen "),a("code",[e._v("false")]),e._v(" (the default), the "),a("code",[e._v("receive()")]),e._v(" method checks whether the reader is "),a("code",[e._v("ready()")]),e._v(" and returns null if not.\nEOF (end of file) is not detected in this case.\nWhen "),a("code",[e._v("true")]),e._v(", the "),a("code",[e._v("receive()")]),e._v(" method blocks until data is available or EOF is detected on the underlying stream.\nWhen EOF is detected, a "),a("code",[e._v("StreamClosedEvent")]),e._v(" (application event) is published.\nYou can consume this event with a bean that implements "),a("code",[e._v("ApplicationListener")]),e._v(".")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("To facilitate EOF detection, the poller thread blocks in the "),a("code",[e._v("receive()")]),e._v(" method until either data arrives or EOF is detected.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("table",[a("thead",[a("tr",[a("th"),e._v(" "),a("th",[e._v("The poller continues to publish an event on each poll once EOF has been detected."),a("br"),e._v("The application listener can stop the adapter to prevent this."),a("br"),e._v("The event is published on the poller thread."),a("br"),e._v("Stopping the adapter causes the thread to be interrupted."),a("br"),e._v("If you intend to perform some interruptible task after stopping the adapter, you must either perform the "),a("code",[e._v("stop()")]),e._v(" on a different thread or use a different thread for those downstream activities."),a("br"),e._v("Note that sending to a "),a("code",[e._v("QueueChannel")]),e._v(" is interruptible, so, if you wish to send a message from the listener, do it before stopping the adapter.")])])]),e._v(" "),a("tbody")]),e._v(" "),a("p",[e._v("This facilitates “piping” or redirecting data to "),a("code",[e._v("stdin")]),e._v(", as the following two examples shows:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("cat myfile.txt | java -jar my.jar\n")])])]),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("java -jar my.jar < foo.txt\n")])])]),a("p",[e._v("This approach lets the application stop when the pipe is closed.")]),e._v(" "),a("p",[e._v("Four convenient factory methods are available:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v("public static final CharacterStreamReadingMessageSource stdin() { ... }\n\npublic static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... }\n\npublic static final CharacterStreamReadingMessageSource stdinPipe() { ... }\n\npublic static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }\n")])])]),a("h3",{attrs:{id:"writing-to-streams"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#writing-to-streams"}},[e._v("#")]),e._v(" Writing to Streams")]),e._v(" "),a("p",[e._v("For target streams, you can use either of two implementations: "),a("code",[e._v("ByteStreamWritingMessageHandler")]),e._v(" or "),a("code",[e._v("CharacterStreamWritingMessageHandler")]),e._v(".\nEach requires a single constructor argument ("),a("code",[e._v("OutputStream")]),e._v(" for byte streams or "),a("code",[e._v("Writer")]),e._v(" for character streams), and each provides a second constructor that adds the optional 'bufferSize'.\nSince both of these ultimately implement the "),a("code",[e._v("MessageHandler")]),e._v(" interface, you can reference them from a "),a("code",[e._v("channel-adapter")]),e._v(" configuration, as described in "),a("RouterLink",{attrs:{to:"/en/spring-integration/channel-adapter.html#channel-adapter"}},[e._v("Channel Adapter")]),e._v(".")],1),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n \n \n\n\n\n \n\n')])])]),a("h3",{attrs:{id:"stream-namespace-support"}},[a("a",{staticClass:"header-anchor",attrs:{href:"#stream-namespace-support"}},[e._v("#")]),e._v(" Stream Namespace Support")]),e._v(" "),a("p",[e._v("Spring Integration defines a namespace to reduce the configuration needed for stream-related channel adapters.\nThe following schema locations are needed to use it:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n\n')])])]),a("p",[e._v("The following code snippet shows the different configuration options that are supported to configure the inbound channel adapter:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n\n\n')])])]),a("p",[e._v("Starting with version 5.0, you can set the "),a("code",[e._v("detect-eof")]),e._v(" attribute, which sets the "),a("code",[e._v("blockToDetectEOF")]),e._v(" property.\nSee "),a("a",{attrs:{href:"#stream-reading"}},[e._v("Reading from Streams")]),e._v(" for more information.")]),e._v(" "),a("p",[e._v("To configure the outbound channel adapter, you can use the namespace support as well.\nThe following example shows the different configuration for an outbound channel adapters:")]),e._v(" "),a("div",{staticClass:"language- extra-class"},[a("pre",{pre:!0,attrs:{class:"language-text"}},[a("code",[e._v('\n\n\n\n\n\n\n')])])])])}),[],!1,null,null,null);t.default=n.exports}}]);