DataBufferUtils.java 20.8 KB
Newer Older
A
Arjen Poutsma 已提交
1
/*
2
 * Copyright 2002-2018 the original author or authors.
A
Arjen Poutsma 已提交
3 4 5 6 7 8 9 10 11 12 13 14 15 16
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

17
package org.springframework.core.io.buffer;
A
Arjen Poutsma 已提交
18

19
import java.io.File;
A
Arjen Poutsma 已提交
20
import java.io.IOException;
A
Arjen Poutsma 已提交
21
import java.io.InputStream;
22
import java.io.OutputStream;
A
Arjen Poutsma 已提交
23
import java.nio.ByteBuffer;
24 25
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.Channel;
A
Arjen Poutsma 已提交
26
import java.nio.channels.Channels;
27
import java.nio.channels.CompletionHandler;
A
Arjen Poutsma 已提交
28
import java.nio.channels.ReadableByteChannel;
29
import java.nio.channels.WritableByteChannel;
30
import java.nio.file.StandardOpenOption;
31
import java.util.concurrent.atomic.AtomicBoolean;
A
Arjen Poutsma 已提交
32
import java.util.concurrent.atomic.AtomicLong;
33
import java.util.function.BiFunction;
34
import java.util.function.BinaryOperator;
35
import java.util.function.Consumer;
A
Arjen Poutsma 已提交
36 37

import org.reactivestreams.Publisher;
38 39
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
A
Arjen Poutsma 已提交
40
import reactor.core.publisher.Flux;
41
import reactor.core.publisher.FluxSink;
A
Arjen Poutsma 已提交
42
import reactor.core.publisher.Mono;
43
import reactor.core.publisher.SynchronousSink;
A
Arjen Poutsma 已提交
44

45
import org.springframework.core.io.Resource;
46
import org.springframework.lang.Nullable;
A
Arjen Poutsma 已提交
47 48
import org.springframework.util.Assert;

B
Brian Clozel 已提交
49
/**
50 51
 * Utility class for working with {@link DataBuffer}s.
 *
A
Arjen Poutsma 已提交
52
 * @author Arjen Poutsma
B
Brian Clozel 已提交
53
 * @author Brian Clozel
54
 * @since 5.0
A
Arjen Poutsma 已提交
55 56 57
 */
public abstract class DataBufferUtils {

58 59
	private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release;

60 61 62 63 64 65 66
	private static final BinaryOperator<DataBuffer> WRITE_AGGREGATOR =
			(dataBuffer1, dataBuffer2) -> {
				DataBuffer result = dataBuffer1.write(dataBuffer2);
				release(dataBuffer2);
				return result;
			};

67 68 69 70
	//---------------------------------------------------------------------
	// Reading
	//---------------------------------------------------------------------

A
Arjen Poutsma 已提交
71
	/**
72
	 * Read the given {@code InputStream} into a {@code Flux} of
A
Arjen Poutsma 已提交
73
	 * {@code DataBuffer}s. Closes the input stream when the flux is terminated.
A
Arjen Poutsma 已提交
74
	 * @param inputStream the input stream to read from
75
	 * @param dataBufferFactory the factory to create data buffers with
A
Arjen Poutsma 已提交
76 77 78
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
A
Arjen Poutsma 已提交
79
	public static Flux<DataBuffer> read(InputStream inputStream,
80
			DataBufferFactory dataBufferFactory, int bufferSize) {
81 82

		Assert.notNull(inputStream, "InputStream must not be null");
A
Arjen Poutsma 已提交
83

A
Arjen Poutsma 已提交
84
		ReadableByteChannel channel = Channels.newChannel(inputStream);
85
		return read(channel, dataBufferFactory, bufferSize);
A
Arjen Poutsma 已提交
86 87 88
	}

	/**
89
	 * Read the given {@code ReadableByteChannel} into a {@code Flux} of
A
Arjen Poutsma 已提交
90 91
	 * {@code DataBuffer}s. Closes the channel when the flux is terminated.
	 * @param channel the channel to read from
92
	 * @param dataBufferFactory the factory to create data buffers with
A
Arjen Poutsma 已提交
93 94 95
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
A
Arjen Poutsma 已提交
96
	public static Flux<DataBuffer> read(ReadableByteChannel channel,
97
			DataBufferFactory dataBufferFactory, int bufferSize) {
98 99 100

		Assert.notNull(channel, "ReadableByteChannel must not be null");
		Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null");
A
Arjen Poutsma 已提交
101
		Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
A
Arjen Poutsma 已提交
102

103
		return Flux.generate(() -> channel,
104
				new ReadableByteChannelGenerator(dataBufferFactory, bufferSize),
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
				DataBufferUtils::closeChannel);
	}

	/**
	 * Read the given {@code AsynchronousFileChannel} into a {@code Flux} of
	 * {@code DataBuffer}s. Closes the channel when the flux is terminated.
	 * @param channel the channel to read from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
	public static Flux<DataBuffer> read(AsynchronousFileChannel channel,
			DataBufferFactory dataBufferFactory, int bufferSize) {
		return read(channel, 0, dataBufferFactory, bufferSize);
	}

	/**
	 * Read the given {@code AsynchronousFileChannel} into a {@code Flux} of
	 * {@code DataBuffer}s, starting at the given position. Closes the channel when the flux is
	 * terminated.
	 * @param channel the channel to read from
	 * @param position the position to start reading from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
	public static Flux<DataBuffer> read(AsynchronousFileChannel channel,
			long position, DataBufferFactory dataBufferFactory, int bufferSize) {

		Assert.notNull(channel, "'channel' must not be null");
		Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
		Assert.isTrue(position >= 0, "'position' must be >= 0");
A
Arjen Poutsma 已提交
137
		Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
138

A
Arjen Poutsma 已提交
139 140
		DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
		ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
141

142 143
		return Flux.create(sink -> {
			sink.onDispose(() -> closeChannel(channel));
A
Arjen Poutsma 已提交
144 145 146 147
			CompletionHandler<Integer, DataBuffer> completionHandler =
					new AsynchronousFileChannelReadCompletionHandler(channel, sink, position,
							dataBufferFactory, bufferSize);
			channel.read(byteBuffer, position, dataBuffer, completionHandler);
148 149 150
		});
	}

151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
	/**
	 * Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s.
	 * <p>If the resource is a file, it is read into an
	 * {@code AsynchronousFileChannel} and turned to {@code Flux} via
	 * {@link #read(AsynchronousFileChannel, DataBufferFactory, int)} or else
	 * fall back on {@link #read(InputStream, DataBufferFactory, int)} closes
	 * the channel when the flux is terminated.
	 * @param resource the resource to read from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
	public static Flux<DataBuffer> read(Resource resource,
			DataBufferFactory dataBufferFactory, int bufferSize) {

		return read(resource, 0, dataBufferFactory, bufferSize);
	}

	/**
	 * Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s
	 * starting at the given position.
	 * <p>If the resource is a file, it is read into an
	 * {@code AsynchronousFileChannel} and turned to {@code Flux} via
	 * {@link #read(AsynchronousFileChannel, DataBufferFactory, int)} or else
	 * fall back on {@link #read(InputStream, DataBufferFactory, int)}. Closes
	 * the channel when the flux is terminated.
	 * @param resource the resource to read from
	 * @param position the position to start reading from
	 * @param dataBufferFactory the factory to create data buffers with
	 * @param bufferSize the maximum size of the data buffers
	 * @return a flux of data buffers read from the given channel
	 */
	public static Flux<DataBuffer> read(Resource resource, long position,
			DataBufferFactory dataBufferFactory, int bufferSize) {

		try {
			if (resource.isFile()) {
				File file = resource.getFile();
				AsynchronousFileChannel channel =
						AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ);
				return DataBufferUtils.read(channel, position, dataBufferFactory, bufferSize);
			}
		}
		catch (IOException ignore) {
			// fallback to resource.readableChannel(), below
		}

		try {
			ReadableByteChannel channel = resource.readableChannel();
			Flux<DataBuffer> in = DataBufferUtils.read(channel, dataBufferFactory, bufferSize);
			return DataBufferUtils.skipUntilByteCount(in, position);
		}
		catch (IOException ex) {
			return Flux.error(ex);
		}
	}


209 210 211 212
	//---------------------------------------------------------------------
	// Writing
	//---------------------------------------------------------------------

213 214
	/**
	 * Write the given stream of {@link DataBuffer}s to the given {@code OutputStream}. Does
215 216 217 218 219
	 * <strong>not</strong> close the output stream when the flux is terminated, and does
	 * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
	 * source. If releasing is required, then subscribe to the returned {@code Flux} with a
	 * {@link #releaseConsumer()}.
	 * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed
220 221 222
	 * to.
	 * @param source the stream of data buffers to be written
	 * @param outputStream the output stream to write to
223 224
	 * @return a flux containing the same buffers as in {@code source}, that starts the writing
	 * process when subscribed to, and that publishes any writing errors and the completion signal
225
	 */
226
	public static Flux<DataBuffer> write(Publisher<DataBuffer> source,
227 228 229 230 231 232 233 234 235 236 237
			OutputStream outputStream) {

		Assert.notNull(source, "'source' must not be null");
		Assert.notNull(outputStream, "'outputStream' must not be null");

		WritableByteChannel channel = Channels.newChannel(outputStream);
		return write(source, channel);
	}

	/**
	 * Write the given stream of {@link DataBuffer}s to the given {@code WritableByteChannel}. Does
238 239 240 241 242
	 * <strong>not</strong> close the channel when the flux is terminated, and does
	 * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
	 * source. If releasing is required, then subscribe to the returned {@code Flux} with a
	 * {@link #releaseConsumer()}.
	 * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed
243 244 245
	 * to.
	 * @param source the stream of data buffers to be written
	 * @param channel the channel to write to
246 247
	 * @return a flux containing the same buffers as in {@code source}, that starts the writing
	 * process when subscribed to, and that publishes any writing errors and the completion signal
248
	 */
249
	public static Flux<DataBuffer> write(Publisher<DataBuffer> source,
250 251 252 253 254 255 256
			WritableByteChannel channel) {

		Assert.notNull(source, "'source' must not be null");
		Assert.notNull(channel, "'channel' must not be null");

		Flux<DataBuffer> flux = Flux.from(source);

257
		return Flux.create(sink ->
258 259 260 261 262 263
				flux.subscribe(dataBuffer -> {
							try {
								ByteBuffer byteBuffer = dataBuffer.asByteBuffer();
								while (byteBuffer.hasRemaining()) {
									channel.write(byteBuffer);
								}
264
								sink.next(dataBuffer);
265 266 267 268 269 270 271
							}
							catch (IOException ex) {
								sink.error(ex);
							}

						},
						sink::error,
272
						sink::complete));
273 274 275 276
	}

	/**
	 * Write the given stream of {@link DataBuffer}s to the given {@code AsynchronousFileChannel}.
277 278 279 280 281
	 * Does <strong>not</strong> close the channel when the flux is terminated, and does
	 * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
	 * source. If releasing is required, then subscribe to the returned {@code Flux} with a
	 * {@link #releaseConsumer()}.
	 * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed
282 283 284
	 * to.
	 * @param source the stream of data buffers to be written
	 * @param channel the channel to write to
285 286
	 * @return a flux containing the same buffers as in {@code source}, that starts the writing
	 * process when subscribed to, and that publishes any writing errors and the completion signal
287
	 */
288
	public static Flux<DataBuffer> write(Publisher<DataBuffer> source, AsynchronousFileChannel channel,
289 290 291 292 293 294 295 296
			long position) {

		Assert.notNull(source, "'source' must not be null");
		Assert.notNull(channel, "'channel' must not be null");
		Assert.isTrue(position >= 0, "'position' must be >= 0");

		Flux<DataBuffer> flux = Flux.from(source);

297
		return Flux.create(sink -> {
298 299 300 301 302 303 304
			BaseSubscriber<DataBuffer> subscriber =
					new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position);
			flux.subscribe(subscriber);
		});
	}


305
	private static void closeChannel(@Nullable Channel channel) {
306 307 308 309 310 311 312
		try {
			if (channel != null) {
				channel.close();
			}
		}
		catch (IOException ignored) {
		}
A
Arjen Poutsma 已提交
313 314
	}

315 316 317 318
	//---------------------------------------------------------------------
	// Various
	//---------------------------------------------------------------------

A
Arjen Poutsma 已提交
319
	/**
320 321 322
	 * Relay buffers from the given {@link Publisher} until the total
	 * {@linkplain DataBuffer#readableByteCount() byte count} reaches
	 * the given maximum byte count, or until the publisher is complete.
A
Arjen Poutsma 已提交
323 324 325 326
	 * @param publisher the publisher to filter
	 * @param maxByteCount the maximum byte count
	 * @return a flux whose maximum byte count is {@code maxByteCount}
	 */
327 328
	public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
		Assert.notNull(publisher, "Publisher must not be null");
A
Arjen Poutsma 已提交
329
		Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
		AtomicLong byteCountDown = new AtomicLong(maxByteCount);

		return Flux.from(publisher).
				takeWhile(dataBuffer -> {
					int delta = -dataBuffer.readableByteCount();
					long currentCount = byteCountDown.getAndAdd(delta);
					return currentCount >= 0;
				}).
				map(dataBuffer -> {
					long currentCount = byteCountDown.get();
					if (currentCount >= 0) {
						return dataBuffer;
					}
					else {
						// last buffer
						int size = (int) (currentCount + dataBuffer.readableByteCount());
						return dataBuffer.slice(0, size);
					}
				});
	}

B
Brian Clozel 已提交
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
	/**
	 * Skip buffers from the given {@link Publisher} until the total
	 * {@linkplain DataBuffer#readableByteCount() byte count} reaches
	 * the given maximum byte count, or until the publisher is complete.
	 * @param publisher the publisher to filter
	 * @param maxByteCount the maximum byte count
	 * @return a flux with the remaining part of the given publisher
	 */
	public static Flux<DataBuffer> skipUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
		Assert.notNull(publisher, "Publisher must not be null");
		Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
		AtomicLong byteCountDown = new AtomicLong(maxByteCount);

		return Flux.from(publisher).
				skipUntil(dataBuffer -> {
					int delta = -dataBuffer.readableByteCount();
					long currentCount = byteCountDown.addAndGet(delta);
					if(currentCount < 0) {
						return true;
					} else {
						DataBufferUtils.release(dataBuffer);
						return false;
					}
				}).
				map(dataBuffer -> {
					long currentCount = byteCountDown.get();
					// slice first buffer, then let others flow through
					if (currentCount < 0) {
						int skip = (int) (currentCount + dataBuffer.readableByteCount());
						byteCountDown.set(0);
						return dataBuffer.slice(skip, dataBuffer.readableByteCount() - skip);
					}
					return dataBuffer;
				});
	}

387
	/**
388
	 * Retain the given data buffer, it it is a {@link PooledDataBuffer}.
389 390 391 392 393 394 395 396 397 398 399
	 * @param dataBuffer the data buffer to retain
	 * @return the retained buffer
	 */
	@SuppressWarnings("unchecked")
	public static <T extends DataBuffer> T retain(T dataBuffer) {
		if (dataBuffer instanceof PooledDataBuffer) {
			return (T) ((PooledDataBuffer) dataBuffer).retain();
		}
		else {
			return dataBuffer;
		}
A
Arjen Poutsma 已提交
400 401
	}

A
Arjen Poutsma 已提交
402
	/**
403
	 * Release the given data buffer, if it is a {@link PooledDataBuffer}.
A
Arjen Poutsma 已提交
404 405 406
	 * @param dataBuffer the data buffer to release
	 * @return {@code true} if the buffer was released; {@code false} otherwise.
	 */
407
	public static boolean release(@Nullable DataBuffer dataBuffer) {
A
Arjen Poutsma 已提交
408 409 410 411 412 413
		if (dataBuffer instanceof PooledDataBuffer) {
			return ((PooledDataBuffer) dataBuffer).release();
		}
		return false;
	}

414
	/**
415
	 * Return a consumer that calls {@link #release(DataBuffer)} on all
416 417 418 419 420 421
	 * passed data buffers.
	 */
	public static Consumer<DataBuffer> releaseConsumer() {
		return RELEASE_CONSUMER;
	}

A
Arjen Poutsma 已提交
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
	/**
	 * Composes the buffers in the given {@link Publisher} into a single data buffer. Depending on
	 * the {@code DataBuffer} implementation, the returned buffer may be a single buffer containing
	 * all data of the provided buffers, or it may be a true composite that contains references to
	 * the buffers.
	 * @param publisher the data buffers that are to be composed
	 * @return the composed data buffer
	 */
	public static Mono<DataBuffer> compose(Publisher<DataBuffer> publisher) {
		Assert.notNull(publisher, "'publisher' must not be null");

		Flux<DataBuffer> source = Flux.from(publisher);

		return source.collectList()
				.filter(dataBuffers -> !dataBuffers.isEmpty())
				.map(dataBuffers -> {
					DataBufferFactory bufferFactory = dataBuffers.get(0).factory();
					return bufferFactory.compose(dataBuffers);
				});
	}

443

444
	private static class ReadableByteChannelGenerator
445
			implements BiFunction<ReadableByteChannel, SynchronousSink<DataBuffer>, ReadableByteChannel> {
A
Arjen Poutsma 已提交
446

447
		private final DataBufferFactory dataBufferFactory;
A
Arjen Poutsma 已提交
448

A
Arjen Poutsma 已提交
449
		private final int bufferSize;
A
Arjen Poutsma 已提交
450

A
Arjen Poutsma 已提交
451
		public ReadableByteChannelGenerator(DataBufferFactory dataBufferFactory, int bufferSize) {
452
			this.dataBufferFactory = dataBufferFactory;
A
Arjen Poutsma 已提交
453
			this.bufferSize = bufferSize;
A
Arjen Poutsma 已提交
454 455 456
		}

		@Override
A
Arjen Poutsma 已提交
457 458 459 460
		public ReadableByteChannel apply(ReadableByteChannel channel,
				SynchronousSink<DataBuffer> sub) {
			boolean release = true;
			DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
A
Arjen Poutsma 已提交
461 462
			try {
				int read;
A
Arjen Poutsma 已提交
463 464 465 466 467
				ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, dataBuffer.capacity());
				if ((read = channel.read(byteBuffer)) >= 0) {
					dataBuffer.writePosition(read);
					release = false;
					sub.next(dataBuffer);
A
Arjen Poutsma 已提交
468 469
				}
				else {
S
Stephane Maldini 已提交
470
					sub.complete();
A
Arjen Poutsma 已提交
471 472 473
				}
			}
			catch (IOException ex) {
474
				sub.error(ex);
A
Arjen Poutsma 已提交
475
			}
A
Arjen Poutsma 已提交
476 477 478 479 480
			finally {
				if (release) {
					release(dataBuffer);
				}
			}
481
			return channel;
A
Arjen Poutsma 已提交
482 483 484
		}
	}

485

486
	private static class AsynchronousFileChannelReadCompletionHandler
A
Arjen Poutsma 已提交
487
			implements CompletionHandler<Integer, DataBuffer> {
488

A
Arjen Poutsma 已提交
489
		private final AsynchronousFileChannel channel;
490

A
Arjen Poutsma 已提交
491
		private final FluxSink<DataBuffer> sink;
492 493 494

		private final DataBufferFactory dataBufferFactory;

A
Arjen Poutsma 已提交
495 496 497
		private final int bufferSize;

		private AtomicLong position;
498

A
Arjen Poutsma 已提交
499 500 501 502
		private AsynchronousFileChannelReadCompletionHandler(
				AsynchronousFileChannel channel, FluxSink<DataBuffer> sink,
				long position, DataBufferFactory dataBufferFactory, int bufferSize) {
			this.channel = channel;
503
			this.sink = sink;
A
Arjen Poutsma 已提交
504
			this.position = new AtomicLong(position);
505
			this.dataBufferFactory = dataBufferFactory;
A
Arjen Poutsma 已提交
506
			this.bufferSize = bufferSize;
507 508 509
		}

		@Override
A
Arjen Poutsma 已提交
510
		public void completed(Integer read, DataBuffer dataBuffer) {
511
			if (read != -1) {
A
Arjen Poutsma 已提交
512 513 514
				long pos = this.position.addAndGet(read);
				dataBuffer.writePosition(read);
				this.sink.next(dataBuffer);
515

516
				if (!this.sink.isCancelled()) {
A
Arjen Poutsma 已提交
517 518 519 520
					DataBuffer newDataBuffer =
							this.dataBufferFactory.allocateBuffer(this.bufferSize);
					ByteBuffer newByteBuffer = newDataBuffer.asByteBuffer(0, this.bufferSize);
					this.channel.read(newByteBuffer, pos, newDataBuffer, this);
521 522 523
				}
			}
			else {
A
Arjen Poutsma 已提交
524 525
				release(dataBuffer);
				closeChannel(this.channel);
526
				this.sink.complete();
527 528 529 530
			}
		}

		@Override
A
Arjen Poutsma 已提交
531 532 533
		public void failed(Throwable exc, DataBuffer dataBuffer) {
			release(dataBuffer);
			closeChannel(this.channel);
534
			this.sink.error(exc);
535 536
		}
	}
537

538

539 540 541 542
	private static class AsynchronousFileChannelWriteCompletionHandler
			extends BaseSubscriber<DataBuffer>
			implements CompletionHandler<Integer, ByteBuffer> {

543
		private final FluxSink<DataBuffer> sink;
544 545 546

		private final AsynchronousFileChannel channel;

547 548
		private final AtomicBoolean completed = new AtomicBoolean();

549
		private final AtomicLong position;
550 551 552 553 554

		@Nullable
		private DataBuffer dataBuffer;

		public AsynchronousFileChannelWriteCompletionHandler(
555
				FluxSink<DataBuffer> sink, AsynchronousFileChannel channel, long position) {
556 557
			this.sink = sink;
			this.channel = channel;
558
			this.position = new AtomicLong(position);
559 560 561 562 563 564 565 566 567 568 569 570
		}

		@Override
		protected void hookOnSubscribe(Subscription subscription) {
			request(1);
		}

		@Override
		protected void hookOnNext(DataBuffer value) {
			this.dataBuffer = value;
			ByteBuffer byteBuffer = value.asByteBuffer();

571
			this.channel.write(byteBuffer, this.position.get(), byteBuffer, this);
572 573 574 575 576 577 578 579 580
		}

		@Override
		protected void hookOnError(Throwable throwable) {
			this.sink.error(throwable);
		}

		@Override
		protected void hookOnComplete() {
581
			this.completed.set(true);
582 583 584 585

			if (this.dataBuffer == null) {
				this.sink.complete();
			}
586 587 588 589
		}

		@Override
		public void completed(Integer written, ByteBuffer byteBuffer) {
A
Arjen Poutsma 已提交
590
			long pos = this.position.addAndGet(written);
591
			if (byteBuffer.hasRemaining()) {
A
Arjen Poutsma 已提交
592
				this.channel.write(byteBuffer, pos, byteBuffer, this);
593 594
				return;
			}
595 596

			if (this.dataBuffer != null) {
597
				this.sink.next(this.dataBuffer);
598
				this.dataBuffer = null;
599 600 601
			}
			if (this.completed.get()) {
				this.sink.complete();
602 603 604 605 606 607 608 609 610 611 612
			}
			else {
				request(1);
			}
		}

		@Override
		public void failed(Throwable exc, ByteBuffer byteBuffer) {
			this.sink.error(exc);
		}
	}
A
Arjen Poutsma 已提交
613
}