From c8704ce473b4fb1ab400b4e9deafa456a12c6fe7 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Fri, 19 Jul 2019 11:36:52 +0200 Subject: [PATCH] Introduce DataBufferWrapper This commit introduces the DataBufferWrapper, a wrapper for DataBuffers, and uses it in applicable use cases. --- .../core/codec/StringDecoder.java | 153 +------------ .../core/io/buffer/DataBufferWrapper.java | 213 ++++++++++++++++++ .../core/io/buffer/LeakAwareDataBuffer.java | 188 +--------------- .../io/buffer/LeakAwareDataBufferFactory.java | 2 +- .../reactive/UndertowServerHttpRequest.java | 165 +------------- 5 files changed, 237 insertions(+), 484 deletions(-) create mode 100644 spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferWrapper.java diff --git a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java index b267a54c43..d6d0ccfc27 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java @@ -16,9 +16,6 @@ package org.springframework.core.codec; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -28,15 +25,14 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.function.IntPredicate; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DataBufferWrapper; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.core.log.LogFormatUtils; @@ -264,7 +260,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder { } - private static class EndFrameBuffer implements DataBuffer { + private static class EndFrameBuffer extends DataBufferWrapper { private static final DataBuffer BUFFER = new DefaultDataBufferFactory().wrap(new byte[0]); @@ -272,6 +268,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder { public EndFrameBuffer(byte[] delimiter) { + super(BUFFER); this.delimiter = delimiter; } @@ -279,150 +276,6 @@ public final class StringDecoder extends AbstractDataBufferDecoder { return this.delimiter; } - @Override - public DataBufferFactory factory() { - return BUFFER.factory(); - } - - @Override - public int indexOf(IntPredicate predicate, int fromIndex) { - return BUFFER.indexOf(predicate, fromIndex); - } - - @Override - public int lastIndexOf(IntPredicate predicate, int fromIndex) { - return BUFFER.lastIndexOf(predicate, fromIndex); - } - - @Override - public int readableByteCount() { - return BUFFER.readableByteCount(); - } - - @Override - public int writableByteCount() { - return BUFFER.writableByteCount(); - } - - @Override - public int capacity() { - return BUFFER.capacity(); - } - - @Override - public DataBuffer capacity(int capacity) { - return BUFFER.capacity(capacity); - } - - @Override - public DataBuffer ensureCapacity(int capacity) { - return BUFFER.ensureCapacity(capacity); - } - - @Override - public int readPosition() { - return BUFFER.readPosition(); - } - - @Override - public DataBuffer readPosition(int readPosition) { - return BUFFER.readPosition(readPosition); - } - - @Override - public int writePosition() { - return BUFFER.writePosition(); - } - - @Override - public DataBuffer writePosition(int writePosition) { - return BUFFER.writePosition(writePosition); - } - - @Override - public byte getByte(int index) { - return BUFFER.getByte(index); - } - - @Override - public byte read() { - return BUFFER.read(); - } - - @Override - public DataBuffer read(byte[] destination) { - return BUFFER.read(destination); - } - - @Override - public DataBuffer read(byte[] destination, int offset, int length) { - return BUFFER.read(destination, offset, length); - } - - @Override - public DataBuffer write(byte b) { - return BUFFER.write(b); - } - - @Override - public DataBuffer write(byte[] source) { - return BUFFER.write(source); - } - - @Override - public DataBuffer write(byte[] source, int offset, int length) { - return BUFFER.write(source, offset, length); - } - - @Override - public DataBuffer write(DataBuffer... buffers) { - return BUFFER.write(buffers); - } - - @Override - public DataBuffer write(ByteBuffer... buffers) { - return BUFFER.write(buffers); - } - - @Override - public DataBuffer write(CharSequence charSequence, Charset charset) { - return BUFFER.write(charSequence, charset); - } - - @Override - public DataBuffer slice(int index, int length) { - return BUFFER.slice(index, length); - } - - @Override - public DataBuffer retainedSlice(int index, int length) { - return BUFFER.retainedSlice(index, length); - } - - @Override - public ByteBuffer asByteBuffer() { - return BUFFER.asByteBuffer(); - } - - @Override - public ByteBuffer asByteBuffer(int index, int length) { - return BUFFER.asByteBuffer(index, length); - } - - @Override - public InputStream asInputStream() { - return BUFFER.asInputStream(); - } - - @Override - public InputStream asInputStream(boolean releaseOnClose) { - return BUFFER.asInputStream(releaseOnClose); - } - - @Override - public OutputStream asOutputStream() { - return BUFFER.asOutputStream(); - } } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferWrapper.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferWrapper.java new file mode 100644 index 0000000000..d4084ec63b --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferWrapper.java @@ -0,0 +1,213 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.function.IntPredicate; + +import org.springframework.util.Assert; + +/** + * Provides a convenient implementation of the {@link DataBuffer} interface + * that can be overridden to adapt the delegate. + * + *

These methods default to calling through to the wrapped delegate object. + * + * @author Arjen Poutsma + * @since 5.2 + */ +public class DataBufferWrapper implements DataBuffer { + + private final DataBuffer delegate; + + + /** + * Create a new {@code DataBufferWrapper} that wraps the given buffer. + * @param delegate the buffer to wrap + */ + public DataBufferWrapper(DataBuffer delegate) { + Assert.notNull(delegate, "Delegate must not be null"); + this.delegate = delegate; + } + + /** + * Return the wrapped delegate. + */ + public DataBuffer dataBuffer() { + return this.delegate; + } + + @Override + public DataBufferFactory factory() { + return this.delegate.factory(); + } + + @Override + public int indexOf(IntPredicate predicate, int fromIndex) { + return this.delegate.indexOf(predicate, fromIndex); + } + + @Override + public int lastIndexOf(IntPredicate predicate, int fromIndex) { + return this.delegate.lastIndexOf(predicate, fromIndex); + } + + @Override + public int readableByteCount() { + return this.delegate.readableByteCount(); + } + + @Override + public int writableByteCount() { + return this.delegate.writableByteCount(); + } + + @Override + public int capacity() { + return this.delegate.capacity(); + } + + @Override + public DataBuffer capacity(int capacity) { + return this.delegate.capacity(capacity); + } + + @Override + public DataBuffer ensureCapacity(int capacity) { + return this.delegate.ensureCapacity(capacity); + } + + @Override + public int readPosition() { + return this.delegate.readPosition(); + } + + @Override + public DataBuffer readPosition(int readPosition) { + return this.delegate.readPosition(readPosition); + } + + @Override + public int writePosition() { + return this.delegate.writePosition(); + } + + @Override + public DataBuffer writePosition(int writePosition) { + return this.delegate.writePosition(writePosition); + } + + @Override + public byte getByte(int index) { + return this.delegate.getByte(index); + } + + @Override + public byte read() { + return this.delegate.read(); + } + + @Override + public DataBuffer read(byte[] destination) { + return this.delegate.read(destination); + } + + @Override + public DataBuffer read(byte[] destination, int offset, int length) { + return this.delegate.read(destination, offset, length); + } + + @Override + public DataBuffer write(byte b) { + return this.delegate.write(b); + } + + @Override + public DataBuffer write(byte[] source) { + return this.delegate.write(source); + } + + @Override + public DataBuffer write(byte[] source, int offset, int length) { + return this.delegate.write(source, offset, length); + } + + @Override + public DataBuffer write(DataBuffer... buffers) { + return this.delegate.write(buffers); + } + + @Override + public DataBuffer write(ByteBuffer... buffers) { + return this.delegate.write(buffers); + } + + @Override + public DataBuffer write(CharSequence charSequence, + Charset charset) { + return this.delegate.write(charSequence, charset); + } + + @Override + public DataBuffer slice(int index, int length) { + return this.delegate.slice(index, length); + } + + @Override + public DataBuffer retainedSlice(int index, int length) { + return this.delegate.retainedSlice(index, length); + } + + @Override + public ByteBuffer asByteBuffer() { + return this.delegate.asByteBuffer(); + } + + @Override + public ByteBuffer asByteBuffer(int index, int length) { + return this.delegate.asByteBuffer(index, length); + } + + @Override + public InputStream asInputStream() { + return this.delegate.asInputStream(); + } + + @Override + public InputStream asInputStream(boolean releaseOnClose) { + return this.delegate.asInputStream(releaseOnClose); + } + + @Override + public OutputStream asOutputStream() { + return this.delegate.asOutputStream(); + } + + @Override + public String toString(Charset charset) { + return this.delegate.toString(charset); + } + + @Override + public String toString(int index, int length, Charset charset) { + return this.delegate.toString(index, length, charset); + } + +} diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java index 155dc056b5..80879f82eb 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java @@ -16,12 +16,6 @@ package org.springframework.core.io.buffer; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.function.IntPredicate; - import org.springframework.util.Assert; /** @@ -29,9 +23,7 @@ import org.springframework.util.Assert; * * @author Arjen Poutsma */ -class LeakAwareDataBuffer implements PooledDataBuffer { - - private final DataBuffer delegate; +class LeakAwareDataBuffer extends DataBufferWrapper implements PooledDataBuffer { private final AssertionError leakError; @@ -39,9 +31,8 @@ class LeakAwareDataBuffer implements PooledDataBuffer { LeakAwareDataBuffer(DataBuffer delegate, LeakAwareDataBufferFactory dataBufferFactory) { - Assert.notNull(delegate, "Delegate must not be null"); + super(delegate); Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null"); - this.delegate = delegate; this.dataBufferFactory = dataBufferFactory; this.leakError = createLeakError(delegate); } @@ -64,193 +55,38 @@ class LeakAwareDataBuffer implements PooledDataBuffer { } - public DataBuffer getDelegate() { - return this.delegate; - } - @Override public boolean isAllocated() { - return this.delegate instanceof PooledDataBuffer && - ((PooledDataBuffer) this.delegate).isAllocated(); + DataBuffer delegate = dataBuffer(); + return delegate instanceof PooledDataBuffer && + ((PooledDataBuffer) delegate).isAllocated(); } @Override public PooledDataBuffer retain() { - if (this.delegate instanceof PooledDataBuffer) { - ((PooledDataBuffer) this.delegate).retain(); + DataBuffer delegate = dataBuffer(); + if (delegate instanceof PooledDataBuffer) { + ((PooledDataBuffer) delegate).retain(); } return this; } @Override public boolean release() { - if (this.delegate instanceof PooledDataBuffer) { - ((PooledDataBuffer) this.delegate).release(); + DataBuffer delegate = dataBuffer(); + if (delegate instanceof PooledDataBuffer) { + ((PooledDataBuffer) delegate).release(); } return isAllocated(); } - // delegation - - @Override public LeakAwareDataBufferFactory factory() { return this.dataBufferFactory; } - @Override - public int indexOf(IntPredicate predicate, int fromIndex) { - return this.delegate.indexOf(predicate, fromIndex); - } - - @Override - public int lastIndexOf(IntPredicate predicate, int fromIndex) { - return this.delegate.lastIndexOf(predicate, fromIndex); - } - - @Override - public int readableByteCount() { - return this.delegate.readableByteCount(); - } - - @Override - public int writableByteCount() { - return this.delegate.writableByteCount(); - } - - @Override - public int readPosition() { - return this.delegate.readPosition(); - } - - @Override - public DataBuffer readPosition(int readPosition) { - return this.delegate.readPosition(readPosition); - } - - @Override - public int writePosition() { - return this.delegate.writePosition(); - } - - @Override - public DataBuffer writePosition(int writePosition) { - return this.delegate.writePosition(writePosition); - } - - @Override - public int capacity() { - return this.delegate.capacity(); - } - - @Override - public DataBuffer capacity(int newCapacity) { - return this.delegate.capacity(newCapacity); - } - - @Override - public DataBuffer ensureCapacity(int capacity) { - return this.delegate.ensureCapacity(capacity); - } - - @Override - public byte getByte(int index) { - return this.delegate.getByte(index); - } - - @Override - public byte read() { - return this.delegate.read(); - } - - @Override - public DataBuffer read(byte[] destination) { - return this.delegate.read(destination); - } - - @Override - public DataBuffer read(byte[] destination, int offset, int length) { - return this.delegate.read(destination, offset, length); - } - - @Override - public DataBuffer write(byte b) { - return this.delegate.write(b); - } - - @Override - public DataBuffer write(byte[] source) { - return this.delegate.write(source); - } - - @Override - public DataBuffer write(byte[] source, int offset, int length) { - return this.delegate.write(source, offset, length); - } - - @Override - public DataBuffer write(DataBuffer... buffers) { - return this.delegate.write(buffers); - } - - @Override - public DataBuffer write(ByteBuffer... buffers) { - return this.delegate.write(buffers); - } - - @Override - public DataBuffer write(CharSequence charSequence, Charset charset) { - return this.delegate.write(charSequence, charset); - } - - @Override - public DataBuffer slice(int index, int length) { - return this.delegate.slice(index, length); - } - - @Override - public ByteBuffer asByteBuffer() { - return this.delegate.asByteBuffer(); - } - - @Override - public ByteBuffer asByteBuffer(int index, int length) { - return this.delegate.asByteBuffer(index, length); - } - - @Override - public InputStream asInputStream() { - return this.delegate.asInputStream(); - } - - @Override - public InputStream asInputStream(boolean releaseOnClose) { - return this.delegate.asInputStream(releaseOnClose); - } - - @Override - public OutputStream asOutputStream() { - return this.delegate.asOutputStream(); - } - - @Override - public boolean equals(Object o) { - if (o instanceof LeakAwareDataBuffer) { - LeakAwareDataBuffer other = (LeakAwareDataBuffer) o; - return this.delegate.equals(other.delegate); - } - else { - return false; - } - } - - @Override - public int hashCode() { - return this.delegate.hashCode(); - } - @Override public String toString() { - return String.format("LeakAwareDataBuffer (%s)", this.delegate); + return String.format("LeakAwareDataBuffer (%s)", dataBuffer()); } } diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java index 230d1ac51e..0486b89a03 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java @@ -130,7 +130,7 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory { public DataBuffer join(List dataBuffers) { // Remove LeakAwareDataBuffer wrapper so delegate can find native buffers dataBuffers = dataBuffers.stream() - .map(o -> o instanceof LeakAwareDataBuffer ? ((LeakAwareDataBuffer) o).getDelegate() : o) + .map(o -> o instanceof LeakAwareDataBuffer ? ((LeakAwareDataBuffer) o).dataBuffer() : o) .collect(Collectors.toList()); return new LeakAwareDataBuffer(this.delegate.join(dataBuffers), this); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index bd506e219d..50b31b40a3 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -17,15 +17,11 @@ package org.springframework.http.server.reactive; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; -import java.nio.charset.Charset; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.IntPredicate; import javax.net.ssl.SSLSession; import io.undertow.connector.ByteBufferPool; @@ -38,6 +34,7 @@ import reactor.core.publisher.Flux; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DataBufferWrapper; import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpCookie; import org.springframework.http.HttpHeaders; @@ -206,24 +203,23 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { } - private static class UndertowDataBuffer implements PooledDataBuffer { - - private final DataBuffer dataBuffer; + private static class UndertowDataBuffer extends DataBufferWrapper implements PooledDataBuffer { private final PooledByteBuffer pooledByteBuffer; private final AtomicInteger refCount; + public UndertowDataBuffer(DataBuffer dataBuffer, PooledByteBuffer pooledByteBuffer) { - this.dataBuffer = dataBuffer; + super(dataBuffer); this.pooledByteBuffer = pooledByteBuffer; this.refCount = new AtomicInteger(1); } private UndertowDataBuffer(DataBuffer dataBuffer, PooledByteBuffer pooledByteBuffer, AtomicInteger refCount) { + super(dataBuffer); this.refCount = refCount; - this.dataBuffer = dataBuffer; this.pooledByteBuffer = pooledByteBuffer; } @@ -235,7 +231,7 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { @Override public PooledDataBuffer retain() { this.refCount.incrementAndGet(); - DataBufferUtils.retain(this.dataBuffer); + DataBufferUtils.retain(dataBuffer()); return this; } @@ -244,7 +240,7 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { int refCount = this.refCount.decrementAndGet(); if (refCount == 0) { try { - return DataBufferUtils.release(this.dataBuffer); + return DataBufferUtils.release(dataBuffer()); } finally { this.pooledByteBuffer.close(); @@ -253,157 +249,12 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { return false; } - @Override - public DataBufferFactory factory() { - return this.dataBuffer.factory(); - } - - @Override - public int indexOf(IntPredicate predicate, int fromIndex) { - return this.dataBuffer.indexOf(predicate, fromIndex); - } - - @Override - public int lastIndexOf(IntPredicate predicate, int fromIndex) { - return this.dataBuffer.lastIndexOf(predicate, fromIndex); - } - - @Override - public int readableByteCount() { - return this.dataBuffer.readableByteCount(); - } - - @Override - public int writableByteCount() { - return this.dataBuffer.writableByteCount(); - } - - @Override - public int readPosition() { - return this.dataBuffer.readPosition(); - } - - @Override - public DataBuffer readPosition(int readPosition) { - return this.dataBuffer.readPosition(readPosition); - } - - @Override - public int writePosition() { - return this.dataBuffer.writePosition(); - } - - @Override - public DataBuffer writePosition(int writePosition) { - this.dataBuffer.writePosition(writePosition); - return this; - } - - @Override - public int capacity() { - return this.dataBuffer.capacity(); - } - - @Override - public DataBuffer capacity(int newCapacity) { - this.dataBuffer.capacity(newCapacity); - return this; - } - - @Override - public DataBuffer ensureCapacity(int capacity) { - this.dataBuffer.ensureCapacity(capacity); - return this; - } - - @Override - public byte getByte(int index) { - return this.dataBuffer.getByte(index); - } - - @Override - public byte read() { - return this.dataBuffer.read(); - } - - @Override - public DataBuffer read(byte[] destination) { - this.dataBuffer.read(destination); - return this; - } - - @Override - public DataBuffer read(byte[] destination, int offset, int length) { - this.dataBuffer.read(destination, offset, length); - return this; - } - - @Override - public DataBuffer write(byte b) { - this.dataBuffer.write(b); - return this; - } - - @Override - public DataBuffer write(byte[] source) { - this.dataBuffer.write(source); - return this; - } - - @Override - public DataBuffer write(byte[] source, int offset, int length) { - this.dataBuffer.write(source, offset, length); - return this; - } - - @Override - public DataBuffer write(DataBuffer... buffers) { - this.dataBuffer.write(buffers); - return this; - } - - @Override - public DataBuffer write(ByteBuffer... byteBuffers) { - this.dataBuffer.write(byteBuffers); - return this; - } - - @Override - public DataBuffer write(CharSequence charSequence, Charset charset) { - this.dataBuffer.write(charSequence, charset); - return this; - } - @Override public DataBuffer slice(int index, int length) { - DataBuffer slice = this.dataBuffer.slice(index, length); + DataBuffer slice = dataBuffer().slice(index, length); return new UndertowDataBuffer(slice, this.pooledByteBuffer, this.refCount); } - @Override - public ByteBuffer asByteBuffer() { - return this.dataBuffer.asByteBuffer(); - } - - @Override - public ByteBuffer asByteBuffer(int index, int length) { - return this.dataBuffer.asByteBuffer(index, length); - } - - @Override - public InputStream asInputStream() { - return this.dataBuffer.asInputStream(); - } - - @Override - public InputStream asInputStream(boolean releaseOnClose) { - return this.dataBuffer.asInputStream(releaseOnClose); - } - - @Override - public OutputStream asOutputStream() { - return this.dataBuffer.asOutputStream(); - } } } -- GitLab