DefaultDataBuffer.java 13.9 KB
Newer Older
A
Arjen Poutsma 已提交
1
/*
2
 * Copyright 2002-2018 the original author or authors.
A
Arjen Poutsma 已提交
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.core.io.buffer;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
22
import java.nio.Buffer;
A
Arjen Poutsma 已提交
23 24
import java.nio.ByteBuffer;
import java.util.Arrays;
25
import java.util.function.IntPredicate;
A
Arjen Poutsma 已提交
26 27 28 29 30

import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

/**
31
 * Default implementation of the {@link DataBuffer} interface that uses a
A
Arjen Poutsma 已提交
32
 * {@link ByteBuffer} internally. with separate read and write positions.
33
 * Constructed using the {@link DefaultDataBufferFactory}.
A
Arjen Poutsma 已提交
34
 *
J
Juergen Hoeller 已提交
35 36
 * <p>Inspired by Netty's {@code ByteBuf}. Introduced so that non-Netty runtimes
 * (i.e. Servlet) do not require Netty on the classpath.
A
Arjen Poutsma 已提交
37
 *
A
Arjen Poutsma 已提交
38
 * @author Arjen Poutsma
39
 * @author Juergen Hoeller
40
 * @since 5.0
41
 * @see DefaultDataBufferFactory
A
Arjen Poutsma 已提交
42 43 44
 */
public class DefaultDataBuffer implements DataBuffer {

45 46 47 48 49
	private static final int MAX_CAPACITY = Integer.MAX_VALUE;

	private static final int CAPACITY_THRESHOLD = 1024 * 1024 * 4;


50
	private final DefaultDataBufferFactory dataBufferFactory;
51

A
Arjen Poutsma 已提交
52 53
	private ByteBuffer byteBuffer;

J
Juergen Hoeller 已提交
54 55
	private int capacity;

A
Arjen Poutsma 已提交
56 57 58 59
	private int readPosition;

	private int writePosition;

60

A
Arjen Poutsma 已提交
61
	private DefaultDataBuffer(DefaultDataBufferFactory dataBufferFactory, ByteBuffer byteBuffer) {
J
Juergen Hoeller 已提交
62 63
		Assert.notNull(dataBufferFactory, "DefaultDataBufferFactory must not be null");
		Assert.notNull(byteBuffer, "ByteBuffer must not be null");
A
Arjen Poutsma 已提交
64 65 66 67
		this.dataBufferFactory = dataBufferFactory;
		ByteBuffer slice = byteBuffer.slice();
		this.byteBuffer = slice;
		this.capacity = slice.remaining();
A
Arjen Poutsma 已提交
68 69
	}

J
Juergen Hoeller 已提交
70
	static DefaultDataBuffer fromFilledByteBuffer(DefaultDataBufferFactory dataBufferFactory, ByteBuffer byteBuffer) {
A
Arjen Poutsma 已提交
71 72 73 74
		DefaultDataBuffer dataBuffer = new DefaultDataBuffer(dataBufferFactory, byteBuffer);
		dataBuffer.writePosition(byteBuffer.remaining());
		return dataBuffer;
	}
A
Arjen Poutsma 已提交
75

J
Juergen Hoeller 已提交
76
	static DefaultDataBuffer fromEmptyByteBuffer(DefaultDataBufferFactory dataBufferFactory, ByteBuffer byteBuffer) {
A
Arjen Poutsma 已提交
77
		return new DefaultDataBuffer(dataBufferFactory, byteBuffer);
78 79
	}

A
Arjen Poutsma 已提交
80 81 82 83 84 85 86 87 88

	/**
	 * Directly exposes the native {@code ByteBuffer} that this buffer is based on.
	 * @return the wrapped byte buffer
	 */
	public ByteBuffer getNativeBuffer() {
		return this.byteBuffer;
	}

A
Arjen Poutsma 已提交
89 90 91 92 93
	private void setNativeBuffer(ByteBuffer byteBuffer) {
		this.byteBuffer = byteBuffer;
		this.capacity = byteBuffer.remaining();
	}

J
Juergen Hoeller 已提交
94

95 96 97 98 99
	@Override
	public DefaultDataBufferFactory factory() {
		return this.dataBufferFactory;
	}

A
Arjen Poutsma 已提交
100
	@Override
A
Arjen Poutsma 已提交
101 102 103 104 105 106 107 108 109 110
	public int indexOf(IntPredicate predicate, int fromIndex) {
		Assert.notNull(predicate, "'predicate' must not be null");

		if (fromIndex < 0) {
			fromIndex = 0;
		}
		else if (fromIndex >= this.writePosition) {
			return -1;
		}
		for (int i = fromIndex; i < this.writePosition; i++) {
111 112 113 114 115 116 117 118 119
			byte b = this.byteBuffer.get(i);
			if (predicate.test(b)) {
				return i;
			}
		}
		return -1;
	}

	@Override
A
Arjen Poutsma 已提交
120 121 122 123
	public int lastIndexOf(IntPredicate predicate, int fromIndex) {
		Assert.notNull(predicate, "'predicate' must not be null");
		int i = Math.min(fromIndex, this.writePosition - 1);
		for (; i >= 0; i--) {
124 125 126 127 128 129
			byte b = this.byteBuffer.get(i);
			if (predicate.test(b)) {
				return i;
			}
		}
		return -1;
A
Arjen Poutsma 已提交
130 131 132 133 134 135 136
	}

	@Override
	public int readableByteCount() {
		return this.writePosition - this.readPosition;
	}

A
Arjen Poutsma 已提交
137 138 139 140 141 142 143 144 145 146 147
	@Override
	public int writableByteCount() {
		return this.capacity - this.writePosition;
	}

	@Override
	public int readPosition() {
		return this.readPosition;
	}

	@Override
148
	public DefaultDataBuffer readPosition(int readPosition) {
A
Arjen Poutsma 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162
		assertIndex(readPosition >= 0, "'readPosition' %d must be >= 0", readPosition);
		assertIndex(readPosition <= this.writePosition, "'readPosition' %d must be <= %d",
				readPosition, this.writePosition);

		this.readPosition = readPosition;
		return this;
	}

	@Override
	public int writePosition() {
		return this.writePosition;
	}

	@Override
163
	public DefaultDataBuffer writePosition(int writePosition) {
A
Arjen Poutsma 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
		assertIndex(writePosition >= this.readPosition, "'writePosition' %d must be >= %d",
				writePosition, this.readPosition);
		assertIndex(writePosition <= this.capacity, "'writePosition' %d must be <= %d",
				writePosition, this.capacity);

		this.writePosition = writePosition;
		return this;
	}

	@Override
	public int capacity() {
		return this.capacity;
	}

	@Override
179
	public DefaultDataBuffer capacity(int newCapacity) {
A
Arjen Poutsma 已提交
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
		Assert.isTrue(newCapacity > 0,
				String.format("'newCapacity' %d must be higher than 0", newCapacity));

		int readPosition = readPosition();
		int writePosition = writePosition();
		int oldCapacity = capacity();

		if (newCapacity > oldCapacity) {
			ByteBuffer oldBuffer = this.byteBuffer;
			ByteBuffer newBuffer = allocate(newCapacity, oldBuffer.isDirect());
			((Buffer) oldBuffer).position(0).limit(oldBuffer.capacity());
			((Buffer) newBuffer).position(0).limit(oldBuffer.capacity());
			newBuffer.put(oldBuffer);
			newBuffer.clear();
			setNativeBuffer(newBuffer);
		}
		else if (newCapacity < oldCapacity) {
			ByteBuffer oldBuffer = this.byteBuffer;
			ByteBuffer newBuffer = allocate(newCapacity, oldBuffer.isDirect());
			if (readPosition < newCapacity) {
				if (writePosition > newCapacity) {
					writePosition = newCapacity;
					writePosition(writePosition);
				}
				((Buffer) oldBuffer).position(readPosition).limit(writePosition);
				((Buffer) newBuffer).position(readPosition).limit(writePosition);
				newBuffer.put(oldBuffer);
				newBuffer.clear();
			}
			else {
				readPosition(newCapacity);
				writePosition(newCapacity);
			}
			setNativeBuffer(newBuffer);
		}
		return this;
	}

	private static ByteBuffer allocate(int capacity, boolean direct) {
		return direct ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity);
	}

222 223 224 225 226 227 228 229 230
	@Override
	public byte getByte(int index) {
		assertIndex(index >= 0, "index %d must be >= 0", index);
		assertIndex(index <= this.writePosition - 1, "index %d must be <= %d",
				index, this.writePosition - 1);

		return this.byteBuffer.get(index);
	}

A
Arjen Poutsma 已提交
231 232
	@Override
	public byte read() {
A
Arjen Poutsma 已提交
233 234 235 236 237 238
		assertIndex(this.readPosition <= this.writePosition - 1, "readPosition %d must be <= %d",
				this.readPosition, this.writePosition - 1);
		int pos = this.readPosition;
		byte b = this.byteBuffer.get(pos);
		this.readPosition = pos + 1;
		return b;
A
Arjen Poutsma 已提交
239 240 241 242 243
	}

	@Override
	public DefaultDataBuffer read(byte[] destination) {
		Assert.notNull(destination, "'destination' must not be null");
A
Arjen Poutsma 已提交
244
		read(destination, 0, destination.length);
A
Arjen Poutsma 已提交
245 246 247 248 249 250
		return this;
	}

	@Override
	public DefaultDataBuffer read(byte[] destination, int offset, int length) {
		Assert.notNull(destination, "'destination' must not be null");
A
Arjen Poutsma 已提交
251 252 253
		assertIndex(this.readPosition <= this.writePosition - length,
				"readPosition %d and length %d should be smaller than writePosition %d",
				this.readPosition, length, this.writePosition);
A
Arjen Poutsma 已提交
254

A
Arjen Poutsma 已提交
255 256 257 258 259 260 261
		ByteBuffer tmp = this.byteBuffer.duplicate();
		int limit = this.readPosition + length;
		((Buffer) tmp).clear().position(this.readPosition).limit(limit);
		tmp.get(destination, offset, length);

		this.readPosition += length;
		return this;
A
Arjen Poutsma 已提交
262 263 264 265
	}

	@Override
	public DefaultDataBuffer write(byte b) {
A
Arjen Poutsma 已提交
266 267 268 269
		ensureCapacity(1);
		int pos = this.writePosition;
		this.byteBuffer.put(pos, b);
		this.writePosition = pos + 1;
A
Arjen Poutsma 已提交
270 271 272 273 274 275
		return this;
	}

	@Override
	public DefaultDataBuffer write(byte[] source) {
		Assert.notNull(source, "'source' must not be null");
A
Arjen Poutsma 已提交
276
		write(source, 0, source.length);
A
Arjen Poutsma 已提交
277 278 279 280 281 282
		return this;
	}

	@Override
	public DefaultDataBuffer write(byte[] source, int offset, int length) {
		Assert.notNull(source, "'source' must not be null");
A
Arjen Poutsma 已提交
283 284 285 286 287 288 289 290
		ensureCapacity(length);

		ByteBuffer tmp = this.byteBuffer.duplicate();
		int limit = this.writePosition + length;
		((Buffer) tmp).clear().position(this.writePosition).limit(limit);
		tmp.put(source, offset, length);

		this.writePosition += length;
A
Arjen Poutsma 已提交
291 292 293 294
		return this;
	}

	@Override
295
	public DefaultDataBuffer write(DataBuffer... buffers) {
A
Arjen Poutsma 已提交
296 297 298 299 300 301 302 303 304 305 306 307
		if (!ObjectUtils.isEmpty(buffers)) {
			ByteBuffer[] byteBuffers =
					Arrays.stream(buffers).map(DataBuffer::asByteBuffer)
							.toArray(ByteBuffer[]::new);
			write(byteBuffers);
		}
		return this;
	}

	@Override
	public DefaultDataBuffer write(ByteBuffer... byteBuffers) {
		Assert.notEmpty(byteBuffers, "'byteBuffers' must not be empty");
A
Arjen Poutsma 已提交
308 309 310
		int capacity = Arrays.stream(byteBuffers).mapToInt(ByteBuffer::remaining).sum();
		ensureCapacity(capacity);
		Arrays.stream(byteBuffers).forEach(this::write);
A
Arjen Poutsma 已提交
311 312 313
		return this;
	}

A
Arjen Poutsma 已提交
314 315 316 317 318 319 320
	private void write(ByteBuffer source) {
		int length = source.remaining();
		ByteBuffer tmp = this.byteBuffer.duplicate();
		int limit = this.writePosition + source.remaining();
		((Buffer) tmp).clear().position(this.writePosition).limit(limit);
		tmp.put(source);
		this.writePosition += length;
321 322 323
	}

	@Override
324
	public DefaultDataBuffer slice(int index, int length) {
A
Arjen Poutsma 已提交
325
		checkIndex(index, length);
326
		int oldPosition = this.byteBuffer.position();
327 328 329
		// Explicit access via Buffer base type for compatibility
		// with covariant return type on JDK 9's ByteBuffer...
		Buffer buffer = this.byteBuffer;
330
		try {
331
			buffer.position(index);
332
			ByteBuffer slice = this.byteBuffer.slice();
333 334
			// Explicit cast for compatibility with covariant return type on JDK 9's ByteBuffer
			((Buffer) slice).limit(length);
A
Arjen Poutsma 已提交
335
			return new SlicedDefaultDataBuffer(slice, this.dataBufferFactory, length);
336 337
		}
		finally {
338
			buffer.position(oldPosition);
339
		}
A
Arjen Poutsma 已提交
340 341 342 343
	}

	@Override
	public ByteBuffer asByteBuffer() {
A
Arjen Poutsma 已提交
344 345 346 347 348 349 350
		return asByteBuffer(this.readPosition, readableByteCount());
	}

	@Override
	public ByteBuffer asByteBuffer(int index, int length) {
		checkIndex(index, length);

A
Arjen Poutsma 已提交
351
		ByteBuffer duplicate = this.byteBuffer.duplicate();
352 353 354
		// Explicit access via Buffer base type for compatibility
		// with covariant return type on JDK 9's ByteBuffer...
		Buffer buffer = duplicate;
A
Arjen Poutsma 已提交
355 356 357
		buffer.position(index);
		buffer.limit(index + length);
		return duplicate.slice();
A
Arjen Poutsma 已提交
358 359 360 361 362 363 364
	}

	@Override
	public InputStream asInputStream() {
		return new DefaultDataBufferInputStream();
	}

365 366 367 368 369
	@Override
	public InputStream asInputStream(boolean releaseOnClose) {
		return new DefaultDataBufferInputStream();
	}

A
Arjen Poutsma 已提交
370 371 372 373 374
	@Override
	public OutputStream asOutputStream() {
		return new DefaultDataBufferOutputStream();
	}

A
Arjen Poutsma 已提交
375 376 377
	private void ensureCapacity(int length) {
		if (length <= writableByteCount()) {
			return;
A
Arjen Poutsma 已提交
378
		}
A
Arjen Poutsma 已提交
379 380
		int newCapacity = calculateCapacity(this.writePosition + length);
		capacity(newCapacity);
A
Arjen Poutsma 已提交
381 382
	}

383
	/**
P
Phillip Webb 已提交
384
	 * Calculate the capacity of the buffer.
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
	 * @see io.netty.buffer.AbstractByteBufAllocator#calculateNewCapacity(int, int)
	 */
	private int calculateCapacity(int neededCapacity) {
		Assert.isTrue(neededCapacity >= 0, "'neededCapacity' must >= 0");

		if (neededCapacity == CAPACITY_THRESHOLD) {
			return CAPACITY_THRESHOLD;
		}
		else if (neededCapacity > CAPACITY_THRESHOLD) {
			int newCapacity = neededCapacity / CAPACITY_THRESHOLD * CAPACITY_THRESHOLD;
			if (newCapacity > MAX_CAPACITY - CAPACITY_THRESHOLD) {
				newCapacity = MAX_CAPACITY;
			}
			else {
				newCapacity += CAPACITY_THRESHOLD;
			}
			return newCapacity;
		}
		else {
			int newCapacity = 64;
			while (newCapacity < neededCapacity) {
				newCapacity <<= 1;
			}
			return Math.min(newCapacity, MAX_CAPACITY);
		}
	}

412

A
Arjen Poutsma 已提交
413
	@Override
J
Juergen Hoeller 已提交
414 415
	public boolean equals(Object other) {
		if (this == other) {
A
Arjen Poutsma 已提交
416 417
			return true;
		}
J
Juergen Hoeller 已提交
418
		if (!(other instanceof DefaultDataBuffer)) {
419
			return false;
A
Arjen Poutsma 已提交
420
		}
J
Juergen Hoeller 已提交
421 422 423 424
		DefaultDataBuffer otherBuffer = (DefaultDataBuffer) other;
		return (this.readPosition == otherBuffer.readPosition &&
				this.writePosition == otherBuffer.writePosition &&
				this.byteBuffer.equals(otherBuffer.byteBuffer));
425 426 427 428 429
	}

	@Override
	public int hashCode() {
		return this.byteBuffer.hashCode();
A
Arjen Poutsma 已提交
430 431 432 433
	}

	@Override
	public String toString() {
434
		return String.format("DefaultDataBuffer (r: %d, w: %d, c: %d)",
J
Juergen Hoeller 已提交
435
				this.readPosition, this.writePosition, this.capacity);
A
Arjen Poutsma 已提交
436 437
	}

J
Juergen Hoeller 已提交
438

A
Arjen Poutsma 已提交
439 440 441 442 443
	private void checkIndex(int index, int length) {
		assertIndex(index >= 0, "index %d must be >= 0", index);
		assertIndex(length >= 0, "length %d must be >= 0", index);
		assertIndex(index <= this.capacity, "index %d must be <= %d", index, this.capacity);
		assertIndex(length <= this.capacity, "length %d must be <= %d", index, this.capacity);
A
Arjen Poutsma 已提交
444 445
	}

A
Arjen Poutsma 已提交
446 447 448 449 450 451
	private static void assertIndex(boolean expression, String format, Object... args) {
		if (!expression) {
			String message = String.format(format, args);
			throw new IndexOutOfBoundsException(message);
		}
	}
452

J
Juergen Hoeller 已提交
453

A
Arjen Poutsma 已提交
454 455 456
	private class DefaultDataBufferInputStream extends InputStream {

		@Override
A
Arjen Poutsma 已提交
457
		public int available() {
458
			return readableByteCount();
A
Arjen Poutsma 已提交
459 460 461 462
		}

		@Override
		public int read() {
A
Arjen Poutsma 已提交
463
			return available() > 0 ? DefaultDataBuffer.this.read() & 0xFF : -1;
A
Arjen Poutsma 已提交
464 465 466 467
		}

		@Override
		public int read(byte[] bytes, int off, int len) throws IOException {
A
Arjen Poutsma 已提交
468 469 470 471 472 473 474 475 476
			int available = available();
			if (available > 0) {
				len = Math.min(len, available);
				DefaultDataBuffer.this.read(bytes, off, len);
				return len;
			}
			else {
				return -1;
			}
A
Arjen Poutsma 已提交
477 478 479
		}
	}

480

A
Arjen Poutsma 已提交
481 482 483 484
	private class DefaultDataBufferOutputStream extends OutputStream {

		@Override
		public void write(int b) throws IOException {
A
Arjen Poutsma 已提交
485
			DefaultDataBuffer.this.write((byte) b);
A
Arjen Poutsma 已提交
486 487 488 489
		}

		@Override
		public void write(byte[] bytes, int off, int len) throws IOException {
A
Arjen Poutsma 已提交
490
			DefaultDataBuffer.this.write(bytes, off, len);
A
Arjen Poutsma 已提交
491 492
		}
	}
493

494

495 496
	private static class SlicedDefaultDataBuffer extends DefaultDataBuffer {

J
Juergen Hoeller 已提交
497
		SlicedDefaultDataBuffer(ByteBuffer byteBuffer, DefaultDataBufferFactory dataBufferFactory, int length) {
A
Arjen Poutsma 已提交
498 499
			super(dataBufferFactory, byteBuffer);
			writePosition(length);
500 501 502
		}

		@Override
503
		public DefaultDataBuffer capacity(int newCapacity) {
J
Juergen Hoeller 已提交
504
			throw new UnsupportedOperationException("Changing the capacity of a sliced buffer is not supported");
505 506
		}
	}
507

A
Arjen Poutsma 已提交
508
}