DataBufferUtils.java 25.0 KB
Newer Older
A
Arjen Poutsma 已提交
1
/*
2
 * Copyright 2002-2018 the original author or authors.
A
Arjen Poutsma 已提交
3 4 5 6 7 8 9 10 11 12 13 14 15 16
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

17
package org.springframework.core.io.buffer;
A
Arjen Poutsma 已提交
18

19
import java.io.File;
A
Arjen Poutsma 已提交
20
import java.io.IOException;
A
Arjen Poutsma 已提交
21
import java.io.InputStream;
22
import java.io.OutputStream;
A
Arjen Poutsma 已提交
23
import java.nio.ByteBuffer;
24 25
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.Channel;
A
Arjen Poutsma 已提交
26
import java.nio.channels.Channels;
27
import java.nio.channels.CompletionHandler;
A
Arjen Poutsma 已提交
28
import java.nio.channels.ReadableByteChannel;
29
import java.nio.channels.WritableByteChannel;
30
import java.nio.file.StandardOpenOption;
31
import java.util.concurrent.Callable;
32
import java.util.concurrent.atomic.AtomicBoolean;
A
Arjen Poutsma 已提交
33
import java.util.concurrent.atomic.AtomicLong;
34
import java.util.function.Consumer;
A
Arjen Poutsma 已提交
35 36

import org.reactivestreams.Publisher;
37 38
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
A
Arjen Poutsma 已提交
39
import reactor.core.publisher.Flux;
40
import reactor.core.publisher.FluxSink;
A
Arjen Poutsma 已提交
41
import reactor.core.publisher.Mono;
42
import reactor.core.publisher.SynchronousSink;
A
Arjen Poutsma 已提交
43

44
import org.springframework.core.io.Resource;
45
import org.springframework.lang.Nullable;
A
Arjen Poutsma 已提交
46 47
import org.springframework.util.Assert;

B
Brian Clozel 已提交
48
/**
49 50
 * Utility class for working with {@link DataBuffer}s.
 *
A
Arjen Poutsma 已提交
51
 * @author Arjen Poutsma
B
Brian Clozel 已提交
52
 * @author Brian Clozel
53
 * @since 5.0
A
Arjen Poutsma 已提交
54 55 56
 */
public abstract class DataBufferUtils {

57 58 59 60 61 62
	private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release;

	//---------------------------------------------------------------------
	// Reading
	//---------------------------------------------------------------------

A
Arjen Poutsma 已提交
63
	/**
64
	 * Read the given {@code InputStream} into a <strong>read-once</strong> {@code Flux} of
A
Arjen Poutsma 已提交
65
	 * {@code DataBuffer}s. Closes the input stream when the flux is terminated.
66 67 68
	 * <p>The resulting {@code Flux} can only be subscribed to once. See
	 * {@link #readInputStream(Callable, DataBufferFactory, int)} for a variant that supports
	 * multiple subscriptions.
A
Arjen Poutsma 已提交
69
	 * @param inputStream the input stream to read from
70
	 * @param dataBufferFactory the factory to create data buffers with
A
Arjen Poutsma 已提交
71 72
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
73 74
	 * @deprecated as of Spring 5.0.3, in favor of
	 * {@link #readInputStream(Callable, DataBufferFactory, int)}, to be removed in Spring 5.1
A
Arjen Poutsma 已提交
75
	 */
76
	@Deprecated
A
Arjen Poutsma 已提交
77
	public static Flux<DataBuffer> read(InputStream inputStream,
78
			DataBufferFactory dataBufferFactory, int bufferSize) {
79 80
		return readInputStream(() -> inputStream, dataBufferFactory, bufferSize);
	}
81

82 83 84 85 86 87 88 89 90 91
	/**
	 * Obtain a {@link InputStream} from the given supplier, and read it into a {@code Flux} of
	 * {@code DataBuffer}s. Closes the input stream when the flux is terminated.
	 * @param inputStreamSupplier the supplier for the input stream to read from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
	public static Flux<DataBuffer> readInputStream(Callable<InputStream> inputStreamSupplier,
			DataBufferFactory dataBufferFactory, int bufferSize) {
A
Arjen Poutsma 已提交
92

93 94 95 96
		Assert.notNull(inputStreamSupplier, "'inputStreamSupplier' must not be null");

		return readByteChannel(() -> Channels.newChannel(inputStreamSupplier.call()),
				dataBufferFactory, bufferSize);
A
Arjen Poutsma 已提交
97 98 99
	}

	/**
100
	 * Read the given {@code ReadableByteChannel} into a <strong>read-once</strong> {@code Flux} of
A
Arjen Poutsma 已提交
101
	 * {@code DataBuffer}s. Closes the channel when the flux is terminated.
102 103 104
	 * <p>The resulting {@code Flux} can only be subscribed to once. See
	 * {@link #readByteChannel(Callable, DataBufferFactory, int)} for a variant that supports
	 * multiple subscriptions.
A
Arjen Poutsma 已提交
105
	 * @param channel the channel to read from
106
	 * @param dataBufferFactory the factory to create data buffers with
A
Arjen Poutsma 已提交
107 108
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
109 110
	 * @deprecated as of Spring 5.0.3, in favor of
	 * {@link #readByteChannel(Callable, DataBufferFactory, int)}, to be removed in Spring 5.1
A
Arjen Poutsma 已提交
111
	 */
112
	@Deprecated
A
Arjen Poutsma 已提交
113
	public static Flux<DataBuffer> read(ReadableByteChannel channel,
114
			DataBufferFactory dataBufferFactory, int bufferSize) {
115 116
		return readByteChannel(() -> channel, dataBufferFactory, bufferSize);
	}
117

118 119 120 121 122 123 124 125 126 127 128 129 130
	/**
	 * Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
	 * {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
	 * @param channelSupplier the supplier for the channel to read from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
	public static Flux<DataBuffer> readByteChannel(Callable<ReadableByteChannel> channelSupplier,
			DataBufferFactory dataBufferFactory, int bufferSize) {

		Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
		Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
A
Arjen Poutsma 已提交
131
		Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
A
Arjen Poutsma 已提交
132

133 134 135 136 137 138 139 140 141
		return Flux.using(channelSupplier,
				channel -> {
					ReadableByteChannelGenerator generator =
							new ReadableByteChannelGenerator(channel, dataBufferFactory,
									bufferSize);
					return Flux.generate(generator);
				},
				DataBufferUtils::closeChannel
		);
142 143 144
	}

	/**
145 146 147 148 149
	 * Read the given {@code AsynchronousFileChannel} into a <strong>read-once</strong> {@code Flux}
	 * of {@code DataBuffer}s. Closes the channel when the flux is terminated.
	 * <p>The resulting {@code Flux} can only be subscribed to once. See
	 * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} for a variant that
	 * supports multiple subscriptions.
150 151 152 153
	 * @param channel the channel to read from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
154 155 156
	 * @deprecated as of Spring 5.0.3, in favor of
	 * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)}, to be removed in
	 * Spring 5.1
157
	 */
158
	@Deprecated
159 160
	public static Flux<DataBuffer> read(AsynchronousFileChannel channel,
			DataBufferFactory dataBufferFactory, int bufferSize) {
161
		return readAsynchronousFileChannel(() -> channel, dataBufferFactory, bufferSize);
162 163 164
	}

	/**
165 166
	 * Read the given {@code AsynchronousFileChannel} into a <strong>read-once</strong> {@code Flux}
	 * of {@code DataBuffer}s, starting at the given position. Closes the channel when the flux is
167
	 * terminated.
168 169 170
	 * <p>The resulting {@code Flux} can only be subscribed to once. See
	 * {@link #readAsynchronousFileChannel(Callable, long, DataBufferFactory, int)} for a variant
	 * that supports multiple subscriptions.
171 172 173 174 175
	 * @param channel the channel to read from
	 * @param position the position to start reading from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
176 177 178
	 * @deprecated as of Spring 5.0.3, in favor of
	 * {@link #readAsynchronousFileChannel(Callable, long, DataBufferFactory, int)}, to be removed
	 * in Spring 5.1
179
	 */
180
	@Deprecated
181 182
	public static Flux<DataBuffer> read(AsynchronousFileChannel channel,
			long position, DataBufferFactory dataBufferFactory, int bufferSize) {
183 184
		return readAsynchronousFileChannel(() -> channel, position, dataBufferFactory, bufferSize);
	}
185

186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
	/**
	 * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
	 * {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
	 * @param channelSupplier the supplier for the channel to read from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
	public static Flux<DataBuffer> readAsynchronousFileChannel(
			Callable<AsynchronousFileChannel> channelSupplier,
			DataBufferFactory dataBufferFactory, int bufferSize) {

		return readAsynchronousFileChannel(channelSupplier, 0, dataBufferFactory, bufferSize);
	}

	/**
	 * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
	 * {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the channel when
	 * the flux is terminated.
	 * @param channelSupplier the supplier for the channel to read from
	 * @param position the position to start reading from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
	public static Flux<DataBuffer> readAsynchronousFileChannel(
			Callable<AsynchronousFileChannel> channelSupplier,
			long position, DataBufferFactory dataBufferFactory, int bufferSize) {

		Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
216 217
		Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
		Assert.isTrue(position >= 0, "'position' must be >= 0");
A
Arjen Poutsma 已提交
218
		Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
219

A
Arjen Poutsma 已提交
220 221
		DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
		ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
222

223 224 225 226 227 228 229 230
		return Flux.using(channelSupplier,
				channel -> Flux.create(sink -> {
							CompletionHandler<Integer, DataBuffer> completionHandler =
									new AsynchronousFileChannelReadCompletionHandler(channel,
											sink, position, dataBufferFactory, bufferSize);
							channel.read(byteBuffer, position, dataBuffer, completionHandler);
						}),
				DataBufferUtils::closeChannel);
231 232
	}

233 234 235 236
	/**
	 * Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s.
	 * <p>If the resource is a file, it is read into an
	 * {@code AsynchronousFileChannel} and turned to {@code Flux} via
237 238 239
	 * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} or else
	 * fall back to {@link #readByteChannel(Callable, DataBufferFactory, int)}.
	 * Closes the channel when the flux is terminated.
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
	 * @param resource the resource to read from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
	public static Flux<DataBuffer> read(Resource resource,
			DataBufferFactory dataBufferFactory, int bufferSize) {

		return read(resource, 0, dataBufferFactory, bufferSize);
	}

	/**
	 * Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s
	 * starting at the given position.
	 * <p>If the resource is a file, it is read into an
	 * {@code AsynchronousFileChannel} and turned to {@code Flux} via
256 257 258
	 * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} or else
	 * fall back on {@link #readByteChannel(Callable, DataBufferFactory, int)}.
	 * Closes the channel when the flux is terminated.
259 260 261 262 263 264 265 266 267 268 269 270
	 * @param resource the resource to read from
	 * @param position the position to start reading from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
	public static Flux<DataBuffer> read(Resource resource, long position,
			DataBufferFactory dataBufferFactory, int bufferSize) {

		try {
			if (resource.isFile()) {
				File file = resource.getFile();
271 272 273 274

				return readAsynchronousFileChannel(
						() -> AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ),
						position, dataBufferFactory, bufferSize);
275 276 277 278 279 280
			}
		}
		catch (IOException ignore) {
			// fallback to resource.readableChannel(), below
		}

281 282
		Flux<DataBuffer> result = readByteChannel(resource::readableChannel, dataBufferFactory, bufferSize);
		return position == 0 ? result : skipUntilByteCount(result, position);
283 284 285
	}


286 287


288 289 290 291
	//---------------------------------------------------------------------
	// Writing
	//---------------------------------------------------------------------

292 293
	/**
	 * Write the given stream of {@link DataBuffer}s to the given {@code OutputStream}. Does
294 295 296 297 298
	 * <strong>not</strong> close the output stream when the flux is terminated, and does
	 * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
	 * source. If releasing is required, then subscribe to the returned {@code Flux} with a
	 * {@link #releaseConsumer()}.
	 * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed
299 300 301
	 * to.
	 * @param source the stream of data buffers to be written
	 * @param outputStream the output stream to write to
302 303
	 * @return a flux containing the same buffers as in {@code source}, that starts the writing
	 * process when subscribed to, and that publishes any writing errors and the completion signal
304
	 */
305
	public static Flux<DataBuffer> write(Publisher<DataBuffer> source,
306 307 308 309 310 311 312 313 314 315 316
			OutputStream outputStream) {

		Assert.notNull(source, "'source' must not be null");
		Assert.notNull(outputStream, "'outputStream' must not be null");

		WritableByteChannel channel = Channels.newChannel(outputStream);
		return write(source, channel);
	}

	/**
	 * Write the given stream of {@link DataBuffer}s to the given {@code WritableByteChannel}. Does
317 318 319 320 321
	 * <strong>not</strong> close the channel when the flux is terminated, and does
	 * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
	 * source. If releasing is required, then subscribe to the returned {@code Flux} with a
	 * {@link #releaseConsumer()}.
	 * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed
322 323 324
	 * to.
	 * @param source the stream of data buffers to be written
	 * @param channel the channel to write to
325 326
	 * @return a flux containing the same buffers as in {@code source}, that starts the writing
	 * process when subscribed to, and that publishes any writing errors and the completion signal
327
	 */
328
	public static Flux<DataBuffer> write(Publisher<DataBuffer> source,
329 330 331 332 333 334 335
			WritableByteChannel channel) {

		Assert.notNull(source, "'source' must not be null");
		Assert.notNull(channel, "'channel' must not be null");

		Flux<DataBuffer> flux = Flux.from(source);

336
		return Flux.create(sink ->
337 338 339 340 341 342
				flux.subscribe(dataBuffer -> {
							try {
								ByteBuffer byteBuffer = dataBuffer.asByteBuffer();
								while (byteBuffer.hasRemaining()) {
									channel.write(byteBuffer);
								}
343
								sink.next(dataBuffer);
344 345 346 347 348 349 350
							}
							catch (IOException ex) {
								sink.error(ex);
							}

						},
						sink::error,
351
						sink::complete));
352 353 354 355
	}

	/**
	 * Write the given stream of {@link DataBuffer}s to the given {@code AsynchronousFileChannel}.
356 357 358 359 360
	 * Does <strong>not</strong> close the channel when the flux is terminated, and does
	 * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
	 * source. If releasing is required, then subscribe to the returned {@code Flux} with a
	 * {@link #releaseConsumer()}.
	 * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed
361 362 363
	 * to.
	 * @param source the stream of data buffers to be written
	 * @param channel the channel to write to
364 365
	 * @return a flux containing the same buffers as in {@code source}, that starts the writing
	 * process when subscribed to, and that publishes any writing errors and the completion signal
366
	 */
367
	public static Flux<DataBuffer> write(Publisher<DataBuffer> source, AsynchronousFileChannel channel,
368 369 370 371 372 373 374 375
			long position) {

		Assert.notNull(source, "'source' must not be null");
		Assert.notNull(channel, "'channel' must not be null");
		Assert.isTrue(position >= 0, "'position' must be >= 0");

		Flux<DataBuffer> flux = Flux.from(source);

376
		return Flux.create(sink -> {
377 378 379 380 381 382 383
			BaseSubscriber<DataBuffer> subscriber =
					new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position);
			flux.subscribe(subscriber);
		});
	}


384
	private static void closeChannel(@Nullable Channel channel) {
385
		try {
386
			if (channel != null && channel.isOpen()) {
387 388 389 390 391
				channel.close();
			}
		}
		catch (IOException ignored) {
		}
A
Arjen Poutsma 已提交
392 393
	}

394 395 396 397
	//---------------------------------------------------------------------
	// Various
	//---------------------------------------------------------------------

A
Arjen Poutsma 已提交
398
	/**
399 400 401
	 * Relay buffers from the given {@link Publisher} until the total
	 * {@linkplain DataBuffer#readableByteCount() byte count} reaches
	 * the given maximum byte count, or until the publisher is complete.
A
Arjen Poutsma 已提交
402 403 404 405
	 * @param publisher the publisher to filter
	 * @param maxByteCount the maximum byte count
	 * @return a flux whose maximum byte count is {@code maxByteCount}
	 */
406 407
	public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
		Assert.notNull(publisher, "Publisher must not be null");
A
Arjen Poutsma 已提交
408
		Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
		AtomicLong byteCountDown = new AtomicLong(maxByteCount);

		return Flux.from(publisher).
				takeWhile(dataBuffer -> {
					int delta = -dataBuffer.readableByteCount();
					long currentCount = byteCountDown.getAndAdd(delta);
					return currentCount >= 0;
				}).
				map(dataBuffer -> {
					long currentCount = byteCountDown.get();
					if (currentCount >= 0) {
						return dataBuffer;
					}
					else {
						// last buffer
						int size = (int) (currentCount + dataBuffer.readableByteCount());
						return dataBuffer.slice(0, size);
					}
				});
	}

B
Brian Clozel 已提交
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
	/**
	 * Skip buffers from the given {@link Publisher} until the total
	 * {@linkplain DataBuffer#readableByteCount() byte count} reaches
	 * the given maximum byte count, or until the publisher is complete.
	 * @param publisher the publisher to filter
	 * @param maxByteCount the maximum byte count
	 * @return a flux with the remaining part of the given publisher
	 */
	public static Flux<DataBuffer> skipUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
		Assert.notNull(publisher, "Publisher must not be null");
		Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
		AtomicLong byteCountDown = new AtomicLong(maxByteCount);

		return Flux.from(publisher).
				skipUntil(dataBuffer -> {
					int delta = -dataBuffer.readableByteCount();
					long currentCount = byteCountDown.addAndGet(delta);
					if(currentCount < 0) {
						return true;
					} else {
						DataBufferUtils.release(dataBuffer);
						return false;
					}
				}).
				map(dataBuffer -> {
					long currentCount = byteCountDown.get();
					// slice first buffer, then let others flow through
					if (currentCount < 0) {
						int skip = (int) (currentCount + dataBuffer.readableByteCount());
						byteCountDown.set(0);
						return dataBuffer.slice(skip, dataBuffer.readableByteCount() - skip);
					}
					return dataBuffer;
				});
	}

466
	/**
467
	 * Retain the given data buffer, it it is a {@link PooledDataBuffer}.
468 469 470 471 472 473 474 475 476 477 478
	 * @param dataBuffer the data buffer to retain
	 * @return the retained buffer
	 */
	@SuppressWarnings("unchecked")
	public static <T extends DataBuffer> T retain(T dataBuffer) {
		if (dataBuffer instanceof PooledDataBuffer) {
			return (T) ((PooledDataBuffer) dataBuffer).retain();
		}
		else {
			return dataBuffer;
		}
A
Arjen Poutsma 已提交
479 480
	}

A
Arjen Poutsma 已提交
481
	/**
482
	 * Release the given data buffer, if it is a {@link PooledDataBuffer}.
A
Arjen Poutsma 已提交
483 484 485
	 * @param dataBuffer the data buffer to release
	 * @return {@code true} if the buffer was released; {@code false} otherwise.
	 */
486
	public static boolean release(@Nullable DataBuffer dataBuffer) {
A
Arjen Poutsma 已提交
487 488 489 490 491 492
		if (dataBuffer instanceof PooledDataBuffer) {
			return ((PooledDataBuffer) dataBuffer).release();
		}
		return false;
	}

493
	/**
494
	 * Return a consumer that calls {@link #release(DataBuffer)} on all
495 496 497 498 499 500
	 * passed data buffers.
	 */
	public static Consumer<DataBuffer> releaseConsumer() {
		return RELEASE_CONSUMER;
	}

A
Arjen Poutsma 已提交
501
	/**
502 503 504 505 506 507 508
	 * Return a new {@code DataBuffer} composed of the {@code dataBuffers} elements joined together.
	 * Depending on the {@link DataBuffer} implementation, the returned buffer may be a single
	 * buffer containing all data of the provided buffers, or it may be a true composite that
	 * contains references to the buffers.
	 * @param dataBuffers the data buffers that are to be composed
	 * @return a buffer that is composed from the {@code dataBuffers} argument
	 * @since 5.0.3
A
Arjen Poutsma 已提交
509
	 */
510
	public static Mono<DataBuffer> join(Publisher<DataBuffer> dataBuffers) {
511 512 513 514 515 516 517 518
		Assert.notNull(dataBuffers, "'dataBuffers' must not be null");

		return Flux.from(dataBuffers)
				.collectList()
				.filter(list -> !list.isEmpty())
				.map(list -> {
					DataBufferFactory bufferFactory = list.get(0).factory();
					return bufferFactory.join(list);
A
Arjen Poutsma 已提交
519 520 521
				});
	}

522

523
	private static class ReadableByteChannelGenerator
524 525 526
			implements Consumer<SynchronousSink<DataBuffer>> {

		private final ReadableByteChannel channel;
A
Arjen Poutsma 已提交
527

528
		private final DataBufferFactory dataBufferFactory;
A
Arjen Poutsma 已提交
529

A
Arjen Poutsma 已提交
530
		private final int bufferSize;
A
Arjen Poutsma 已提交
531

532 533 534 535 536

		public ReadableByteChannelGenerator(ReadableByteChannel channel,
				DataBufferFactory dataBufferFactory, int bufferSize) {

			this.channel = channel;
537
			this.dataBufferFactory = dataBufferFactory;
A
Arjen Poutsma 已提交
538
			this.bufferSize = bufferSize;
A
Arjen Poutsma 已提交
539 540 541
		}

		@Override
542
		public void accept(SynchronousSink<DataBuffer> sink) {
A
Arjen Poutsma 已提交
543 544
			boolean release = true;
			DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
A
Arjen Poutsma 已提交
545 546
			try {
				int read;
A
Arjen Poutsma 已提交
547
				ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, dataBuffer.capacity());
548
				if ((read = this.channel.read(byteBuffer)) >= 0) {
A
Arjen Poutsma 已提交
549 550
					dataBuffer.writePosition(read);
					release = false;
551
					sink.next(dataBuffer);
A
Arjen Poutsma 已提交
552 553
				}
				else {
554
					sink.complete();
A
Arjen Poutsma 已提交
555 556 557
				}
			}
			catch (IOException ex) {
558
				sink.error(ex);
A
Arjen Poutsma 已提交
559
			}
A
Arjen Poutsma 已提交
560 561 562 563 564
			finally {
				if (release) {
					release(dataBuffer);
				}
			}
A
Arjen Poutsma 已提交
565
		}
566

A
Arjen Poutsma 已提交
567 568
	}

569

570
	private static class AsynchronousFileChannelReadCompletionHandler
A
Arjen Poutsma 已提交
571
			implements CompletionHandler<Integer, DataBuffer> {
572

A
Arjen Poutsma 已提交
573
		private final AsynchronousFileChannel channel;
574

A
Arjen Poutsma 已提交
575
		private final FluxSink<DataBuffer> sink;
576 577 578

		private final DataBufferFactory dataBufferFactory;

A
Arjen Poutsma 已提交
579 580
		private final int bufferSize;

581 582 583 584
		private final AtomicLong position;

		private final AtomicBoolean disposed = new AtomicBoolean();

585

A
Arjen Poutsma 已提交
586 587 588 589
		private AsynchronousFileChannelReadCompletionHandler(
				AsynchronousFileChannel channel, FluxSink<DataBuffer> sink,
				long position, DataBufferFactory dataBufferFactory, int bufferSize) {
			this.channel = channel;
590
			this.sink = sink;
A
Arjen Poutsma 已提交
591
			this.position = new AtomicLong(position);
592
			this.dataBufferFactory = dataBufferFactory;
A
Arjen Poutsma 已提交
593
			this.bufferSize = bufferSize;
594 595 596
		}

		@Override
A
Arjen Poutsma 已提交
597
		public void completed(Integer read, DataBuffer dataBuffer) {
598
			if (read != -1) {
A
Arjen Poutsma 已提交
599 600 601
				long pos = this.position.addAndGet(read);
				dataBuffer.writePosition(read);
				this.sink.next(dataBuffer);
602

603
				if (!this.disposed.get()) {
A
Arjen Poutsma 已提交
604 605 606 607
					DataBuffer newDataBuffer =
							this.dataBufferFactory.allocateBuffer(this.bufferSize);
					ByteBuffer newByteBuffer = newDataBuffer.asByteBuffer(0, this.bufferSize);
					this.channel.read(newByteBuffer, pos, newDataBuffer, this);
608 609 610
				}
			}
			else {
A
Arjen Poutsma 已提交
611
				release(dataBuffer);
612
				this.sink.complete();
613 614 615 616
			}
		}

		@Override
A
Arjen Poutsma 已提交
617 618
		public void failed(Throwable exc, DataBuffer dataBuffer) {
			release(dataBuffer);
619
			this.sink.error(exc);
620
		}
621

622
	}
623

624

625 626 627 628
	private static class AsynchronousFileChannelWriteCompletionHandler
			extends BaseSubscriber<DataBuffer>
			implements CompletionHandler<Integer, ByteBuffer> {

629
		private final FluxSink<DataBuffer> sink;
630 631 632

		private final AsynchronousFileChannel channel;

633 634
		private final AtomicBoolean completed = new AtomicBoolean();

635
		private final AtomicLong position;
636 637 638 639 640

		@Nullable
		private DataBuffer dataBuffer;

		public AsynchronousFileChannelWriteCompletionHandler(
641
				FluxSink<DataBuffer> sink, AsynchronousFileChannel channel, long position) {
642 643
			this.sink = sink;
			this.channel = channel;
644
			this.position = new AtomicLong(position);
645 646 647 648 649 650 651 652 653 654 655 656
		}

		@Override
		protected void hookOnSubscribe(Subscription subscription) {
			request(1);
		}

		@Override
		protected void hookOnNext(DataBuffer value) {
			this.dataBuffer = value;
			ByteBuffer byteBuffer = value.asByteBuffer();

657
			this.channel.write(byteBuffer, this.position.get(), byteBuffer, this);
658 659 660 661 662 663 664 665 666
		}

		@Override
		protected void hookOnError(Throwable throwable) {
			this.sink.error(throwable);
		}

		@Override
		protected void hookOnComplete() {
667
			this.completed.set(true);
668 669 670 671

			if (this.dataBuffer == null) {
				this.sink.complete();
			}
672 673 674 675
		}

		@Override
		public void completed(Integer written, ByteBuffer byteBuffer) {
A
Arjen Poutsma 已提交
676
			long pos = this.position.addAndGet(written);
677
			if (byteBuffer.hasRemaining()) {
A
Arjen Poutsma 已提交
678
				this.channel.write(byteBuffer, pos, byteBuffer, this);
679 680
				return;
			}
681 682

			if (this.dataBuffer != null) {
683
				this.sink.next(this.dataBuffer);
684
				this.dataBuffer = null;
685 686 687
			}
			if (this.completed.get()) {
				this.sink.complete();
688 689 690 691 692 693 694 695 696 697 698
			}
			else {
				request(1);
			}
		}

		@Override
		public void failed(Throwable exc, ByteBuffer byteBuffer) {
			this.sink.error(exc);
		}
	}
A
Arjen Poutsma 已提交
699
}