DataBufferUtils.java 28.5 KB
Newer Older
A
Arjen Poutsma 已提交
1
/*
2
 * Copyright 2002-2018 the original author or authors.
A
Arjen Poutsma 已提交
3 4 5 6 7 8 9 10 11 12 13 14 15 16
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

17
package org.springframework.core.io.buffer;
A
Arjen Poutsma 已提交
18

19
import java.io.File;
A
Arjen Poutsma 已提交
20
import java.io.IOException;
A
Arjen Poutsma 已提交
21
import java.io.InputStream;
22
import java.io.OutputStream;
A
Arjen Poutsma 已提交
23
import java.nio.ByteBuffer;
24 25
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.Channel;
A
Arjen Poutsma 已提交
26
import java.nio.channels.Channels;
27
import java.nio.channels.CompletionHandler;
A
Arjen Poutsma 已提交
28
import java.nio.channels.ReadableByteChannel;
29
import java.nio.channels.WritableByteChannel;
30
import java.nio.file.StandardOpenOption;
31
import java.util.concurrent.Callable;
32
import java.util.concurrent.atomic.AtomicBoolean;
A
Arjen Poutsma 已提交
33
import java.util.concurrent.atomic.AtomicLong;
34
import java.util.function.Consumer;
35
import java.util.function.IntPredicate;
A
Arjen Poutsma 已提交
36 37

import org.reactivestreams.Publisher;
38 39
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
A
Arjen Poutsma 已提交
40
import reactor.core.publisher.Flux;
41
import reactor.core.publisher.FluxSink;
A
Arjen Poutsma 已提交
42
import reactor.core.publisher.Mono;
43
import reactor.core.publisher.SynchronousSink;
A
Arjen Poutsma 已提交
44

45
import org.springframework.core.io.Resource;
46
import org.springframework.lang.Nullable;
A
Arjen Poutsma 已提交
47 48
import org.springframework.util.Assert;

B
Brian Clozel 已提交
49
/**
P
Phillip Webb 已提交
50
 * Utility class for working with {@link DataBuffer DataBuffers}.
51
 *
A
Arjen Poutsma 已提交
52
 * @author Arjen Poutsma
B
Brian Clozel 已提交
53
 * @author Brian Clozel
54
 * @since 5.0
A
Arjen Poutsma 已提交
55 56 57
 */
public abstract class DataBufferUtils {

58 59
	private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release;

J
Juergen Hoeller 已提交
60

61 62 63 64
	//---------------------------------------------------------------------
	// Reading
	//---------------------------------------------------------------------

A
Arjen Poutsma 已提交
65
	/**
66
	 * Read the given {@code InputStream} into a <strong>read-once</strong> {@code Flux} of
A
Arjen Poutsma 已提交
67
	 * {@code DataBuffer}s. Closes the input stream when the flux is terminated.
68 69 70
	 * <p>The resulting {@code Flux} can only be subscribed to once. See
	 * {@link #readInputStream(Callable, DataBufferFactory, int)} for a variant that supports
	 * multiple subscriptions.
A
Arjen Poutsma 已提交
71
	 * @param inputStream the input stream to read from
72
	 * @param dataBufferFactory the factory to create data buffers with
A
Arjen Poutsma 已提交
73 74
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
75 76
	 * @deprecated as of Spring 5.0.3, in favor of
	 * {@link #readInputStream(Callable, DataBufferFactory, int)}, to be removed in Spring 5.1
A
Arjen Poutsma 已提交
77
	 */
78
	@Deprecated
J
Juergen Hoeller 已提交
79 80 81
	public static Flux<DataBuffer> read(
			InputStream inputStream, DataBufferFactory dataBufferFactory, int bufferSize) {

82 83
		return readInputStream(() -> inputStream, dataBufferFactory, bufferSize);
	}
84

85
	/**
J
Juergen Hoeller 已提交
86 87
	 * Obtain a {@link InputStream} from the given supplier, and read it into a {@code Flux}
	 * of {@code DataBuffer}s. Closes the input stream when the flux is terminated.
88 89 90 91 92
	 * @param inputStreamSupplier the supplier for the input stream to read from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
J
Juergen Hoeller 已提交
93 94
	public static Flux<DataBuffer> readInputStream(
			Callable<InputStream> inputStreamSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
A
Arjen Poutsma 已提交
95

96 97
		Assert.notNull(inputStreamSupplier, "'inputStreamSupplier' must not be null");

J
Juergen Hoeller 已提交
98
		return readByteChannel(() -> Channels.newChannel(inputStreamSupplier.call()), dataBufferFactory, bufferSize);
A
Arjen Poutsma 已提交
99 100 101
	}

	/**
J
Juergen Hoeller 已提交
102 103
	 * Read the given {@code ReadableByteChannel} into a <strong>read-once</strong> {@code Flux}
	 * of {@code DataBuffer}s. Closes the channel when the flux is terminated.
104 105 106
	 * <p>The resulting {@code Flux} can only be subscribed to once. See
	 * {@link #readByteChannel(Callable, DataBufferFactory, int)} for a variant that supports
	 * multiple subscriptions.
A
Arjen Poutsma 已提交
107
	 * @param channel the channel to read from
108
	 * @param dataBufferFactory the factory to create data buffers with
A
Arjen Poutsma 已提交
109 110
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
111 112
	 * @deprecated as of Spring 5.0.3, in favor of
	 * {@link #readByteChannel(Callable, DataBufferFactory, int)}, to be removed in Spring 5.1
A
Arjen Poutsma 已提交
113
	 */
114
	@Deprecated
J
Juergen Hoeller 已提交
115 116 117
	public static Flux<DataBuffer> read(
			ReadableByteChannel channel, DataBufferFactory dataBufferFactory, int bufferSize) {

118 119
		return readByteChannel(() -> channel, dataBufferFactory, bufferSize);
	}
120

121 122 123 124 125 126 127 128
	/**
	 * Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
	 * {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
	 * @param channelSupplier the supplier for the channel to read from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
J
Juergen Hoeller 已提交
129 130
	public static Flux<DataBuffer> readByteChannel(
			Callable<ReadableByteChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
131 132 133

		Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
		Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
A
Arjen Poutsma 已提交
134
		Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
A
Arjen Poutsma 已提交
135

136 137 138 139 140 141 142 143 144
		return Flux.using(channelSupplier,
				channel -> {
					ReadableByteChannelGenerator generator =
							new ReadableByteChannelGenerator(channel, dataBufferFactory,
									bufferSize);
					return Flux.generate(generator);
				},
				DataBufferUtils::closeChannel
		);
145 146 147
	}

	/**
148 149 150 151 152
	 * Read the given {@code AsynchronousFileChannel} into a <strong>read-once</strong> {@code Flux}
	 * of {@code DataBuffer}s. Closes the channel when the flux is terminated.
	 * <p>The resulting {@code Flux} can only be subscribed to once. See
	 * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} for a variant that
	 * supports multiple subscriptions.
153 154 155 156
	 * @param channel the channel to read from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
157 158 159
	 * @deprecated as of Spring 5.0.3, in favor of
	 * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)}, to be removed in
	 * Spring 5.1
160
	 */
161
	@Deprecated
J
Juergen Hoeller 已提交
162 163 164
	public static Flux<DataBuffer> read(
			AsynchronousFileChannel channel, DataBufferFactory dataBufferFactory, int bufferSize) {

165
		return readAsynchronousFileChannel(() -> channel, dataBufferFactory, bufferSize);
166 167 168
	}

	/**
169 170
	 * Read the given {@code AsynchronousFileChannel} into a <strong>read-once</strong> {@code Flux}
	 * of {@code DataBuffer}s, starting at the given position. Closes the channel when the flux is
171
	 * terminated.
172 173 174
	 * <p>The resulting {@code Flux} can only be subscribed to once. See
	 * {@link #readAsynchronousFileChannel(Callable, long, DataBufferFactory, int)} for a variant
	 * that supports multiple subscriptions.
175 176 177 178 179
	 * @param channel the channel to read from
	 * @param position the position to start reading from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
180 181 182
	 * @deprecated as of Spring 5.0.3, in favor of
	 * {@link #readAsynchronousFileChannel(Callable, long, DataBufferFactory, int)}, to be removed
	 * in Spring 5.1
183
	 */
184
	@Deprecated
J
Juergen Hoeller 已提交
185 186 187
	public static Flux<DataBuffer> read(
			AsynchronousFileChannel channel, long position, DataBufferFactory dataBufferFactory, int bufferSize) {

188 189
		return readAsynchronousFileChannel(() -> channel, position, dataBufferFactory, bufferSize);
	}
190

191 192 193 194 195 196 197 198 199
	/**
	 * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
	 * {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
	 * @param channelSupplier the supplier for the channel to read from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
	public static Flux<DataBuffer> readAsynchronousFileChannel(
J
Juergen Hoeller 已提交
200
			Callable<AsynchronousFileChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
201 202 203 204 205 206

		return readAsynchronousFileChannel(channelSupplier, 0, dataBufferFactory, bufferSize);
	}

	/**
	 * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
J
Juergen Hoeller 已提交
207 208
	 * {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the
	 * channel when the flux is terminated.
209 210 211 212 213 214
	 * @param channelSupplier the supplier for the channel to read from
	 * @param position the position to start reading from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
J
Juergen Hoeller 已提交
215
	public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier,
216 217 218
			long position, DataBufferFactory dataBufferFactory, int bufferSize) {

		Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
219 220
		Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
		Assert.isTrue(position >= 0, "'position' must be >= 0");
A
Arjen Poutsma 已提交
221
		Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
222

A
Arjen Poutsma 已提交
223 224
		DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
		ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
225

226 227 228 229 230 231 232 233
		return Flux.using(channelSupplier,
				channel -> Flux.create(sink -> {
							CompletionHandler<Integer, DataBuffer> completionHandler =
									new AsynchronousFileChannelReadCompletionHandler(channel,
											sink, position, dataBufferFactory, bufferSize);
							channel.read(byteBuffer, position, dataBuffer, completionHandler);
						}),
				DataBufferUtils::closeChannel);
234 235
	}

236 237 238 239
	/**
	 * Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s.
	 * <p>If the resource is a file, it is read into an
	 * {@code AsynchronousFileChannel} and turned to {@code Flux} via
240 241 242
	 * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} or else
	 * fall back to {@link #readByteChannel(Callable, DataBufferFactory, int)}.
	 * Closes the channel when the flux is terminated.
243 244 245 246 247
	 * @param resource the resource to read from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
J
Juergen Hoeller 已提交
248 249
	public static Flux<DataBuffer> read(
			Resource resource, DataBufferFactory dataBufferFactory, int bufferSize) {
250 251 252 253 254 255 256 257 258

		return read(resource, 0, dataBufferFactory, bufferSize);
	}

	/**
	 * Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s
	 * starting at the given position.
	 * <p>If the resource is a file, it is read into an
	 * {@code AsynchronousFileChannel} and turned to {@code Flux} via
259 260 261
	 * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} or else
	 * fall back on {@link #readByteChannel(Callable, DataBufferFactory, int)}.
	 * Closes the channel when the flux is terminated.
262 263 264 265 266 267
	 * @param resource the resource to read from
	 * @param position the position to start reading from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
J
Juergen Hoeller 已提交
268 269
	public static Flux<DataBuffer> read(
			Resource resource, long position, DataBufferFactory dataBufferFactory, int bufferSize) {
270 271 272 273

		try {
			if (resource.isFile()) {
				File file = resource.getFile();
274 275 276
				return readAsynchronousFileChannel(
						() -> AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ),
						position, dataBufferFactory, bufferSize);
277 278 279 280 281 282
			}
		}
		catch (IOException ignore) {
			// fallback to resource.readableChannel(), below
		}

283 284
		Flux<DataBuffer> result = readByteChannel(resource::readableChannel, dataBufferFactory, bufferSize);
		return position == 0 ? result : skipUntilByteCount(result, position);
285 286 287
	}


288 289 290 291
	//---------------------------------------------------------------------
	// Writing
	//---------------------------------------------------------------------

292
	/**
P
Phillip Webb 已提交
293
	 * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code OutputStream}. Does
294 295 296 297
	 * <strong>not</strong> close the output stream when the flux is terminated, and does
	 * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
	 * source. If releasing is required, then subscribe to the returned {@code Flux} with a
	 * {@link #releaseConsumer()}.
J
Juergen Hoeller 已提交
298
	 * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
299 300
	 * @param source the stream of data buffers to be written
	 * @param outputStream the output stream to write to
301 302
	 * @return a flux containing the same buffers as in {@code source}, that starts the writing
	 * process when subscribed to, and that publishes any writing errors and the completion signal
303
	 */
J
Juergen Hoeller 已提交
304
	public static Flux<DataBuffer> write(Publisher<DataBuffer> source, OutputStream outputStream) {
305 306 307 308 309 310 311 312
		Assert.notNull(source, "'source' must not be null");
		Assert.notNull(outputStream, "'outputStream' must not be null");

		WritableByteChannel channel = Channels.newChannel(outputStream);
		return write(source, channel);
	}

	/**
P
Phillip Webb 已提交
313
	 * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code WritableByteChannel}. Does
314 315 316 317
	 * <strong>not</strong> close the channel when the flux is terminated, and does
	 * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
	 * source. If releasing is required, then subscribe to the returned {@code Flux} with a
	 * {@link #releaseConsumer()}.
J
Juergen Hoeller 已提交
318
	 * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
319 320
	 * @param source the stream of data buffers to be written
	 * @param channel the channel to write to
321 322
	 * @return a flux containing the same buffers as in {@code source}, that starts the writing
	 * process when subscribed to, and that publishes any writing errors and the completion signal
323
	 */
J
Juergen Hoeller 已提交
324
	public static Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteChannel channel) {
325 326 327 328
		Assert.notNull(source, "'source' must not be null");
		Assert.notNull(channel, "'channel' must not be null");

		Flux<DataBuffer> flux = Flux.from(source);
329
		return Flux.create(sink ->
330 331 332 333 334 335
				flux.subscribe(dataBuffer -> {
							try {
								ByteBuffer byteBuffer = dataBuffer.asByteBuffer();
								while (byteBuffer.hasRemaining()) {
									channel.write(byteBuffer);
								}
336
								sink.next(dataBuffer);
337 338 339 340 341 342 343
							}
							catch (IOException ex) {
								sink.error(ex);
							}

						},
						sink::error,
344
						sink::complete));
345 346 347
	}

	/**
P
Phillip Webb 已提交
348
	 * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code AsynchronousFileChannel}.
349 350 351 352
	 * Does <strong>not</strong> close the channel when the flux is terminated, and does
	 * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
	 * source. If releasing is required, then subscribe to the returned {@code Flux} with a
	 * {@link #releaseConsumer()}.
J
Juergen Hoeller 已提交
353
	 * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
354 355
	 * @param source the stream of data buffers to be written
	 * @param channel the channel to write to
356 357
	 * @return a flux containing the same buffers as in {@code source}, that starts the writing
	 * process when subscribed to, and that publishes any writing errors and the completion signal
358
	 */
J
Juergen Hoeller 已提交
359 360
	public static Flux<DataBuffer> write(
			Publisher<DataBuffer> source, AsynchronousFileChannel channel, long position) {
361 362 363 364 365 366

		Assert.notNull(source, "'source' must not be null");
		Assert.notNull(channel, "'channel' must not be null");
		Assert.isTrue(position >= 0, "'position' must be >= 0");

		Flux<DataBuffer> flux = Flux.from(source);
367 368
		return Flux.create(sink ->
				flux.subscribe(new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position)));
369 370
	}

371
	private static void closeChannel(@Nullable Channel channel) {
J
Juergen Hoeller 已提交
372 373
		if (channel != null && channel.isOpen()) {
			try {
374 375
				channel.close();
			}
J
Juergen Hoeller 已提交
376 377
			catch (IOException ignored) {
			}
378
		}
A
Arjen Poutsma 已提交
379 380
	}

J
Juergen Hoeller 已提交
381

382 383 384 385
	//---------------------------------------------------------------------
	// Various
	//---------------------------------------------------------------------

A
Arjen Poutsma 已提交
386
	/**
387 388 389
	 * Relay buffers from the given {@link Publisher} until the total
	 * {@linkplain DataBuffer#readableByteCount() byte count} reaches
	 * the given maximum byte count, or until the publisher is complete.
A
Arjen Poutsma 已提交
390 391 392 393
	 * @param publisher the publisher to filter
	 * @param maxByteCount the maximum byte count
	 * @return a flux whose maximum byte count is {@code maxByteCount}
	 */
394 395
	public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
		Assert.notNull(publisher, "Publisher must not be null");
A
Arjen Poutsma 已提交
396
		Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
R
Polish  
Rossen Stoyanchev 已提交
397 398 399 400
		AtomicLong countDown = new AtomicLong(maxByteCount);

		return Flux.from(publisher)
				.map(buffer -> {
401
					long count = countDown.addAndGet(-buffer.readableByteCount());
R
Polish  
Rossen Stoyanchev 已提交
402
					return count >= 0 ? buffer : buffer.slice(0, buffer.readableByteCount() + (int) count);
403 404
				})
				.takeUntil(buffer -> countDown.get() <= 0);
405 406
	}

B
Brian Clozel 已提交
407 408 409 410 411 412 413 414 415 416 417 418 419
	/**
	 * Skip buffers from the given {@link Publisher} until the total
	 * {@linkplain DataBuffer#readableByteCount() byte count} reaches
	 * the given maximum byte count, or until the publisher is complete.
	 * @param publisher the publisher to filter
	 * @param maxByteCount the maximum byte count
	 * @return a flux with the remaining part of the given publisher
	 */
	public static Flux<DataBuffer> skipUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
		Assert.notNull(publisher, "Publisher must not be null");
		Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
		AtomicLong byteCountDown = new AtomicLong(maxByteCount);

R
Polish  
Rossen Stoyanchev 已提交
420 421 422 423 424
		return Flux.from(publisher)
				.skipUntil(buffer -> {
					int delta = -buffer.readableByteCount();
					if (byteCountDown.addAndGet(delta) >= 0) {
						DataBufferUtils.release(buffer);
B
Brian Clozel 已提交
425 426
						return false;
					}
R
Polish  
Rossen Stoyanchev 已提交
427 428 429 430 431 432
					return true;
				})
				.map(buffer -> {
					long count = byteCountDown.get();
					if (count < 0) {
						int skipCount = buffer.readableByteCount() + (int) count;
B
Brian Clozel 已提交
433
						byteCountDown.set(0);
R
Polish  
Rossen Stoyanchev 已提交
434
						return buffer.slice(skipCount, buffer.readableByteCount() - skipCount);
B
Brian Clozel 已提交
435
					}
R
Polish  
Rossen Stoyanchev 已提交
436
					return buffer;
B
Brian Clozel 已提交
437 438 439
				});
	}

440
	/**
441
	 * Retain the given data buffer, it it is a {@link PooledDataBuffer}.
442 443 444 445 446 447 448 449 450 451 452
	 * @param dataBuffer the data buffer to retain
	 * @return the retained buffer
	 */
	@SuppressWarnings("unchecked")
	public static <T extends DataBuffer> T retain(T dataBuffer) {
		if (dataBuffer instanceof PooledDataBuffer) {
			return (T) ((PooledDataBuffer) dataBuffer).retain();
		}
		else {
			return dataBuffer;
		}
A
Arjen Poutsma 已提交
453 454
	}

A
Arjen Poutsma 已提交
455
	/**
456
	 * Release the given data buffer, if it is a {@link PooledDataBuffer}.
A
Arjen Poutsma 已提交
457 458 459
	 * @param dataBuffer the data buffer to release
	 * @return {@code true} if the buffer was released; {@code false} otherwise.
	 */
460
	public static boolean release(@Nullable DataBuffer dataBuffer) {
J
Juergen Hoeller 已提交
461
		return (dataBuffer instanceof PooledDataBuffer && ((PooledDataBuffer) dataBuffer).release());
A
Arjen Poutsma 已提交
462 463
	}

464
	/**
465
	 * Return a consumer that calls {@link #release(DataBuffer)} on all
466 467 468 469 470 471
	 * passed data buffers.
	 */
	public static Consumer<DataBuffer> releaseConsumer() {
		return RELEASE_CONSUMER;
	}

A
Arjen Poutsma 已提交
472
	/**
473 474 475 476
	 * Return a new {@code DataBuffer} composed of the {@code dataBuffers} elements joined together.
	 * Depending on the {@link DataBuffer} implementation, the returned buffer may be a single
	 * buffer containing all data of the provided buffers, or it may be a true composite that
	 * contains references to the buffers.
477 478 479
	 * <p>If {@code dataBuffers} contains an error signal, then all buffers that preceded the error
	 * will be {@linkplain #release(DataBuffer) released}, and the error is stored in the
	 * returned {@code Mono}.
480 481 482
	 * @param dataBuffers the data buffers that are to be composed
	 * @return a buffer that is composed from the {@code dataBuffers} argument
	 * @since 5.0.3
A
Arjen Poutsma 已提交
483
	 */
484
	public static Mono<DataBuffer> join(Publisher<DataBuffer> dataBuffers) {
485 486 487
		Assert.notNull(dataBuffers, "'dataBuffers' must not be null");

		return Flux.from(dataBuffers)
488
				.onErrorResume(DataBufferUtils::exceptionDataBuffer)
489 490
				.collectList()
				.filter(list -> !list.isEmpty())
491 492 493 494 495 496 497 498
				.flatMap(list -> {
					for (int i = 0; i < list.size(); i++) {
						DataBuffer dataBuffer = list.get(i);
						if (dataBuffer instanceof ExceptionDataBuffer) {
							list.subList(0, i).forEach(DataBufferUtils::release);
							return Mono.error(((ExceptionDataBuffer) dataBuffer).throwable());
						}
					}
499
					DataBufferFactory bufferFactory = list.get(0).factory();
500
					return Mono.just(bufferFactory.join(list));
A
Arjen Poutsma 已提交
501 502 503
				});
	}

504 505 506 507
	private static Mono<DataBuffer> exceptionDataBuffer(Throwable throwable) {
		return Mono.just(new ExceptionDataBuffer(throwable));
	}

508

J
Juergen Hoeller 已提交
509
	private static class ReadableByteChannelGenerator implements Consumer<SynchronousSink<DataBuffer>> {
510 511

		private final ReadableByteChannel channel;
A
Arjen Poutsma 已提交
512

513
		private final DataBufferFactory dataBufferFactory;
A
Arjen Poutsma 已提交
514

A
Arjen Poutsma 已提交
515
		private final int bufferSize;
A
Arjen Poutsma 已提交
516

J
Juergen Hoeller 已提交
517 518
		public ReadableByteChannelGenerator(
				ReadableByteChannel channel, DataBufferFactory dataBufferFactory, int bufferSize) {
519 520

			this.channel = channel;
521
			this.dataBufferFactory = dataBufferFactory;
A
Arjen Poutsma 已提交
522
			this.bufferSize = bufferSize;
A
Arjen Poutsma 已提交
523 524 525
		}

		@Override
526
		public void accept(SynchronousSink<DataBuffer> sink) {
A
Arjen Poutsma 已提交
527 528
			boolean release = true;
			DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
A
Arjen Poutsma 已提交
529 530
			try {
				int read;
A
Arjen Poutsma 已提交
531
				ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, dataBuffer.capacity());
532
				if ((read = this.channel.read(byteBuffer)) >= 0) {
A
Arjen Poutsma 已提交
533 534
					dataBuffer.writePosition(read);
					release = false;
535
					sink.next(dataBuffer);
A
Arjen Poutsma 已提交
536 537
				}
				else {
538
					sink.complete();
A
Arjen Poutsma 已提交
539 540 541
				}
			}
			catch (IOException ex) {
542
				sink.error(ex);
A
Arjen Poutsma 已提交
543
			}
A
Arjen Poutsma 已提交
544 545 546 547 548
			finally {
				if (release) {
					release(dataBuffer);
				}
			}
A
Arjen Poutsma 已提交
549 550 551
		}
	}

552

553
	private static class AsynchronousFileChannelReadCompletionHandler
A
Arjen Poutsma 已提交
554
			implements CompletionHandler<Integer, DataBuffer> {
555

A
Arjen Poutsma 已提交
556
		private final AsynchronousFileChannel channel;
557

A
Arjen Poutsma 已提交
558
		private final FluxSink<DataBuffer> sink;
559 560 561

		private final DataBufferFactory dataBufferFactory;

A
Arjen Poutsma 已提交
562 563
		private final int bufferSize;

564 565 566 567
		private final AtomicLong position;

		private final AtomicBoolean disposed = new AtomicBoolean();

J
Juergen Hoeller 已提交
568 569
		public AsynchronousFileChannelReadCompletionHandler(AsynchronousFileChannel channel,
				FluxSink<DataBuffer> sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) {
570

A
Arjen Poutsma 已提交
571
			this.channel = channel;
572
			this.sink = sink;
A
Arjen Poutsma 已提交
573
			this.position = new AtomicLong(position);
574
			this.dataBufferFactory = dataBufferFactory;
A
Arjen Poutsma 已提交
575
			this.bufferSize = bufferSize;
576 577 578
		}

		@Override
A
Arjen Poutsma 已提交
579
		public void completed(Integer read, DataBuffer dataBuffer) {
580
			if (read != -1) {
A
Arjen Poutsma 已提交
581 582 583
				long pos = this.position.addAndGet(read);
				dataBuffer.writePosition(read);
				this.sink.next(dataBuffer);
584
				if (!this.disposed.get()) {
J
Juergen Hoeller 已提交
585
					DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
A
Arjen Poutsma 已提交
586 587
					ByteBuffer newByteBuffer = newDataBuffer.asByteBuffer(0, this.bufferSize);
					this.channel.read(newByteBuffer, pos, newDataBuffer, this);
588 589 590
				}
			}
			else {
A
Arjen Poutsma 已提交
591
				release(dataBuffer);
592
				this.sink.complete();
593 594 595 596
			}
		}

		@Override
A
Arjen Poutsma 已提交
597 598
		public void failed(Throwable exc, DataBuffer dataBuffer) {
			release(dataBuffer);
599
			this.sink.error(exc);
600 601
		}
	}
602

603

J
Juergen Hoeller 已提交
604
	private static class AsynchronousFileChannelWriteCompletionHandler extends BaseSubscriber<DataBuffer>
605 606
			implements CompletionHandler<Integer, ByteBuffer> {

607
		private final FluxSink<DataBuffer> sink;
608 609 610

		private final AsynchronousFileChannel channel;

611 612
		private final AtomicBoolean completed = new AtomicBoolean();

613
		private final AtomicLong position;
614 615 616 617 618

		@Nullable
		private DataBuffer dataBuffer;

		public AsynchronousFileChannelWriteCompletionHandler(
619
				FluxSink<DataBuffer> sink, AsynchronousFileChannel channel, long position) {
J
Juergen Hoeller 已提交
620

621 622
			this.sink = sink;
			this.channel = channel;
623
			this.position = new AtomicLong(position);
624 625 626 627 628 629 630 631 632 633 634
		}

		@Override
		protected void hookOnSubscribe(Subscription subscription) {
			request(1);
		}

		@Override
		protected void hookOnNext(DataBuffer value) {
			this.dataBuffer = value;
			ByteBuffer byteBuffer = value.asByteBuffer();
635
			this.channel.write(byteBuffer, this.position.get(), byteBuffer, this);
636 637 638 639 640 641 642 643 644
		}

		@Override
		protected void hookOnError(Throwable throwable) {
			this.sink.error(throwable);
		}

		@Override
		protected void hookOnComplete() {
645
			this.completed.set(true);
646 647 648 649

			if (this.dataBuffer == null) {
				this.sink.complete();
			}
650 651 652 653
		}

		@Override
		public void completed(Integer written, ByteBuffer byteBuffer) {
A
Arjen Poutsma 已提交
654
			long pos = this.position.addAndGet(written);
655
			if (byteBuffer.hasRemaining()) {
A
Arjen Poutsma 已提交
656
				this.channel.write(byteBuffer, pos, byteBuffer, this);
657 658
				return;
			}
659
			if (this.dataBuffer != null) {
660
				this.sink.next(this.dataBuffer);
661
				this.dataBuffer = null;
662 663 664
			}
			if (this.completed.get()) {
				this.sink.complete();
665 666 667 668 669 670 671 672 673 674 675
			}
			else {
				request(1);
			}
		}

		@Override
		public void failed(Throwable exc, ByteBuffer byteBuffer) {
			this.sink.error(exc);
		}
	}
J
Juergen Hoeller 已提交
676

677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825
	/**
	 * DataBuffer implementation that holds a {@link Throwable}, used in {@link #join(Publisher)}.
	 */
	private static final class ExceptionDataBuffer implements DataBuffer {

		private final Throwable throwable;


		public ExceptionDataBuffer(Throwable throwable) {
			this.throwable = throwable;
		}

		public Throwable throwable() {
			return this.throwable;
		}

		// Unsupported

		@Override
		public DataBufferFactory factory() {
			throw new UnsupportedOperationException();
		}

		@Override
		public int indexOf(IntPredicate predicate, int fromIndex) {
			throw new UnsupportedOperationException();
		}

		@Override
		public int lastIndexOf(IntPredicate predicate, int fromIndex) {
			throw new UnsupportedOperationException();
		}

		@Override
		public int readableByteCount() {
			throw new UnsupportedOperationException();
		}

		@Override
		public int writableByteCount() {
			throw new UnsupportedOperationException();
		}

		@Override
		public int capacity() {
			throw new UnsupportedOperationException();
		}

		@Override
		public DataBuffer capacity(int capacity) {
			throw new UnsupportedOperationException();
		}

		@Override
		public int readPosition() {
			throw new UnsupportedOperationException();
		}

		@Override
		public DataBuffer readPosition(int readPosition) {
			throw new UnsupportedOperationException();
		}

		@Override
		public int writePosition() {
			throw new UnsupportedOperationException();
		}

		@Override
		public DataBuffer writePosition(int writePosition) {
			throw new UnsupportedOperationException();
		}

		@Override
		public byte getByte(int index) {
			throw new UnsupportedOperationException();
		}

		@Override
		public byte read() {
			throw new UnsupportedOperationException();
		}

		@Override
		public DataBuffer read(byte[] destination) {
			throw new UnsupportedOperationException();
		}

		@Override
		public DataBuffer read(byte[] destination, int offset, int length) {
			throw new UnsupportedOperationException();
		}

		@Override
		public DataBuffer write(byte b) {
			throw new UnsupportedOperationException();
		}

		@Override
		public DataBuffer write(byte[] source) {
			throw new UnsupportedOperationException();
		}

		@Override
		public DataBuffer write(byte[] source, int offset, int length) {
			throw new UnsupportedOperationException();
		}

		@Override
		public DataBuffer write(DataBuffer... buffers) {
			throw new UnsupportedOperationException();
		}

		@Override
		public DataBuffer write(ByteBuffer... buffers) {
			throw new UnsupportedOperationException();
		}

		@Override
		public DataBuffer slice(int index, int length) {
			throw new UnsupportedOperationException();
		}

		@Override
		public ByteBuffer asByteBuffer() {
			throw new UnsupportedOperationException();
		}

		@Override
		public ByteBuffer asByteBuffer(int index, int length) {
			throw new UnsupportedOperationException();
		}

		@Override
		public InputStream asInputStream() {
			throw new UnsupportedOperationException();
		}

		@Override
		public InputStream asInputStream(boolean releaseOnClose) {
			throw new UnsupportedOperationException();
		}

		@Override
		public OutputStream asOutputStream() {
			throw new UnsupportedOperationException();
		}
	}

A
Arjen Poutsma 已提交
826
}