提交 c8704ce4 编写于 作者: A Arjen Poutsma

Introduce DataBufferWrapper

This commit introduces the DataBufferWrapper, a wrapper for DataBuffers,
and uses it in applicable use cases.
上级 24e96b6c
......@@ -16,9 +16,6 @@
package org.springframework.core.codec;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
......@@ -28,15 +25,14 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.IntPredicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DataBufferWrapper;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.core.log.LogFormatUtils;
......@@ -264,7 +260,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
}
private static class EndFrameBuffer implements DataBuffer {
private static class EndFrameBuffer extends DataBufferWrapper {
private static final DataBuffer BUFFER = new DefaultDataBufferFactory().wrap(new byte[0]);
......@@ -272,6 +268,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
public EndFrameBuffer(byte[] delimiter) {
super(BUFFER);
this.delimiter = delimiter;
}
......@@ -279,150 +276,6 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
return this.delimiter;
}
@Override
public DataBufferFactory factory() {
return BUFFER.factory();
}
@Override
public int indexOf(IntPredicate predicate, int fromIndex) {
return BUFFER.indexOf(predicate, fromIndex);
}
@Override
public int lastIndexOf(IntPredicate predicate, int fromIndex) {
return BUFFER.lastIndexOf(predicate, fromIndex);
}
@Override
public int readableByteCount() {
return BUFFER.readableByteCount();
}
@Override
public int writableByteCount() {
return BUFFER.writableByteCount();
}
@Override
public int capacity() {
return BUFFER.capacity();
}
@Override
public DataBuffer capacity(int capacity) {
return BUFFER.capacity(capacity);
}
@Override
public DataBuffer ensureCapacity(int capacity) {
return BUFFER.ensureCapacity(capacity);
}
@Override
public int readPosition() {
return BUFFER.readPosition();
}
@Override
public DataBuffer readPosition(int readPosition) {
return BUFFER.readPosition(readPosition);
}
@Override
public int writePosition() {
return BUFFER.writePosition();
}
@Override
public DataBuffer writePosition(int writePosition) {
return BUFFER.writePosition(writePosition);
}
@Override
public byte getByte(int index) {
return BUFFER.getByte(index);
}
@Override
public byte read() {
return BUFFER.read();
}
@Override
public DataBuffer read(byte[] destination) {
return BUFFER.read(destination);
}
@Override
public DataBuffer read(byte[] destination, int offset, int length) {
return BUFFER.read(destination, offset, length);
}
@Override
public DataBuffer write(byte b) {
return BUFFER.write(b);
}
@Override
public DataBuffer write(byte[] source) {
return BUFFER.write(source);
}
@Override
public DataBuffer write(byte[] source, int offset, int length) {
return BUFFER.write(source, offset, length);
}
@Override
public DataBuffer write(DataBuffer... buffers) {
return BUFFER.write(buffers);
}
@Override
public DataBuffer write(ByteBuffer... buffers) {
return BUFFER.write(buffers);
}
@Override
public DataBuffer write(CharSequence charSequence, Charset charset) {
return BUFFER.write(charSequence, charset);
}
@Override
public DataBuffer slice(int index, int length) {
return BUFFER.slice(index, length);
}
@Override
public DataBuffer retainedSlice(int index, int length) {
return BUFFER.retainedSlice(index, length);
}
@Override
public ByteBuffer asByteBuffer() {
return BUFFER.asByteBuffer();
}
@Override
public ByteBuffer asByteBuffer(int index, int length) {
return BUFFER.asByteBuffer(index, length);
}
@Override
public InputStream asInputStream() {
return BUFFER.asInputStream();
}
@Override
public InputStream asInputStream(boolean releaseOnClose) {
return BUFFER.asInputStream(releaseOnClose);
}
@Override
public OutputStream asOutputStream() {
return BUFFER.asOutputStream();
}
}
......
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.io.buffer;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.function.IntPredicate;
import org.springframework.util.Assert;
/**
* Provides a convenient implementation of the {@link DataBuffer} interface
* that can be overridden to adapt the delegate.
*
* <p>These methods default to calling through to the wrapped delegate object.
*
* @author Arjen Poutsma
* @since 5.2
*/
public class DataBufferWrapper implements DataBuffer {
private final DataBuffer delegate;
/**
* Create a new {@code DataBufferWrapper} that wraps the given buffer.
* @param delegate the buffer to wrap
*/
public DataBufferWrapper(DataBuffer delegate) {
Assert.notNull(delegate, "Delegate must not be null");
this.delegate = delegate;
}
/**
* Return the wrapped delegate.
*/
public DataBuffer dataBuffer() {
return this.delegate;
}
@Override
public DataBufferFactory factory() {
return this.delegate.factory();
}
@Override
public int indexOf(IntPredicate predicate, int fromIndex) {
return this.delegate.indexOf(predicate, fromIndex);
}
@Override
public int lastIndexOf(IntPredicate predicate, int fromIndex) {
return this.delegate.lastIndexOf(predicate, fromIndex);
}
@Override
public int readableByteCount() {
return this.delegate.readableByteCount();
}
@Override
public int writableByteCount() {
return this.delegate.writableByteCount();
}
@Override
public int capacity() {
return this.delegate.capacity();
}
@Override
public DataBuffer capacity(int capacity) {
return this.delegate.capacity(capacity);
}
@Override
public DataBuffer ensureCapacity(int capacity) {
return this.delegate.ensureCapacity(capacity);
}
@Override
public int readPosition() {
return this.delegate.readPosition();
}
@Override
public DataBuffer readPosition(int readPosition) {
return this.delegate.readPosition(readPosition);
}
@Override
public int writePosition() {
return this.delegate.writePosition();
}
@Override
public DataBuffer writePosition(int writePosition) {
return this.delegate.writePosition(writePosition);
}
@Override
public byte getByte(int index) {
return this.delegate.getByte(index);
}
@Override
public byte read() {
return this.delegate.read();
}
@Override
public DataBuffer read(byte[] destination) {
return this.delegate.read(destination);
}
@Override
public DataBuffer read(byte[] destination, int offset, int length) {
return this.delegate.read(destination, offset, length);
}
@Override
public DataBuffer write(byte b) {
return this.delegate.write(b);
}
@Override
public DataBuffer write(byte[] source) {
return this.delegate.write(source);
}
@Override
public DataBuffer write(byte[] source, int offset, int length) {
return this.delegate.write(source, offset, length);
}
@Override
public DataBuffer write(DataBuffer... buffers) {
return this.delegate.write(buffers);
}
@Override
public DataBuffer write(ByteBuffer... buffers) {
return this.delegate.write(buffers);
}
@Override
public DataBuffer write(CharSequence charSequence,
Charset charset) {
return this.delegate.write(charSequence, charset);
}
@Override
public DataBuffer slice(int index, int length) {
return this.delegate.slice(index, length);
}
@Override
public DataBuffer retainedSlice(int index, int length) {
return this.delegate.retainedSlice(index, length);
}
@Override
public ByteBuffer asByteBuffer() {
return this.delegate.asByteBuffer();
}
@Override
public ByteBuffer asByteBuffer(int index, int length) {
return this.delegate.asByteBuffer(index, length);
}
@Override
public InputStream asInputStream() {
return this.delegate.asInputStream();
}
@Override
public InputStream asInputStream(boolean releaseOnClose) {
return this.delegate.asInputStream(releaseOnClose);
}
@Override
public OutputStream asOutputStream() {
return this.delegate.asOutputStream();
}
@Override
public String toString(Charset charset) {
return this.delegate.toString(charset);
}
@Override
public String toString(int index, int length, Charset charset) {
return this.delegate.toString(index, length, charset);
}
}
......@@ -16,12 +16,6 @@
package org.springframework.core.io.buffer;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.function.IntPredicate;
import org.springframework.util.Assert;
/**
......@@ -29,9 +23,7 @@ import org.springframework.util.Assert;
*
* @author Arjen Poutsma
*/
class LeakAwareDataBuffer implements PooledDataBuffer {
private final DataBuffer delegate;
class LeakAwareDataBuffer extends DataBufferWrapper implements PooledDataBuffer {
private final AssertionError leakError;
......@@ -39,9 +31,8 @@ class LeakAwareDataBuffer implements PooledDataBuffer {
LeakAwareDataBuffer(DataBuffer delegate, LeakAwareDataBufferFactory dataBufferFactory) {
Assert.notNull(delegate, "Delegate must not be null");
super(delegate);
Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null");
this.delegate = delegate;
this.dataBufferFactory = dataBufferFactory;
this.leakError = createLeakError(delegate);
}
......@@ -64,193 +55,38 @@ class LeakAwareDataBuffer implements PooledDataBuffer {
}
public DataBuffer getDelegate() {
return this.delegate;
}
@Override
public boolean isAllocated() {
return this.delegate instanceof PooledDataBuffer &&
((PooledDataBuffer) this.delegate).isAllocated();
DataBuffer delegate = dataBuffer();
return delegate instanceof PooledDataBuffer &&
((PooledDataBuffer) delegate).isAllocated();
}
@Override
public PooledDataBuffer retain() {
if (this.delegate instanceof PooledDataBuffer) {
((PooledDataBuffer) this.delegate).retain();
DataBuffer delegate = dataBuffer();
if (delegate instanceof PooledDataBuffer) {
((PooledDataBuffer) delegate).retain();
}
return this;
}
@Override
public boolean release() {
if (this.delegate instanceof PooledDataBuffer) {
((PooledDataBuffer) this.delegate).release();
DataBuffer delegate = dataBuffer();
if (delegate instanceof PooledDataBuffer) {
((PooledDataBuffer) delegate).release();
}
return isAllocated();
}
// delegation
@Override
public LeakAwareDataBufferFactory factory() {
return this.dataBufferFactory;
}
@Override
public int indexOf(IntPredicate predicate, int fromIndex) {
return this.delegate.indexOf(predicate, fromIndex);
}
@Override
public int lastIndexOf(IntPredicate predicate, int fromIndex) {
return this.delegate.lastIndexOf(predicate, fromIndex);
}
@Override
public int readableByteCount() {
return this.delegate.readableByteCount();
}
@Override
public int writableByteCount() {
return this.delegate.writableByteCount();
}
@Override
public int readPosition() {
return this.delegate.readPosition();
}
@Override
public DataBuffer readPosition(int readPosition) {
return this.delegate.readPosition(readPosition);
}
@Override
public int writePosition() {
return this.delegate.writePosition();
}
@Override
public DataBuffer writePosition(int writePosition) {
return this.delegate.writePosition(writePosition);
}
@Override
public int capacity() {
return this.delegate.capacity();
}
@Override
public DataBuffer capacity(int newCapacity) {
return this.delegate.capacity(newCapacity);
}
@Override
public DataBuffer ensureCapacity(int capacity) {
return this.delegate.ensureCapacity(capacity);
}
@Override
public byte getByte(int index) {
return this.delegate.getByte(index);
}
@Override
public byte read() {
return this.delegate.read();
}
@Override
public DataBuffer read(byte[] destination) {
return this.delegate.read(destination);
}
@Override
public DataBuffer read(byte[] destination, int offset, int length) {
return this.delegate.read(destination, offset, length);
}
@Override
public DataBuffer write(byte b) {
return this.delegate.write(b);
}
@Override
public DataBuffer write(byte[] source) {
return this.delegate.write(source);
}
@Override
public DataBuffer write(byte[] source, int offset, int length) {
return this.delegate.write(source, offset, length);
}
@Override
public DataBuffer write(DataBuffer... buffers) {
return this.delegate.write(buffers);
}
@Override
public DataBuffer write(ByteBuffer... buffers) {
return this.delegate.write(buffers);
}
@Override
public DataBuffer write(CharSequence charSequence, Charset charset) {
return this.delegate.write(charSequence, charset);
}
@Override
public DataBuffer slice(int index, int length) {
return this.delegate.slice(index, length);
}
@Override
public ByteBuffer asByteBuffer() {
return this.delegate.asByteBuffer();
}
@Override
public ByteBuffer asByteBuffer(int index, int length) {
return this.delegate.asByteBuffer(index, length);
}
@Override
public InputStream asInputStream() {
return this.delegate.asInputStream();
}
@Override
public InputStream asInputStream(boolean releaseOnClose) {
return this.delegate.asInputStream(releaseOnClose);
}
@Override
public OutputStream asOutputStream() {
return this.delegate.asOutputStream();
}
@Override
public boolean equals(Object o) {
if (o instanceof LeakAwareDataBuffer) {
LeakAwareDataBuffer other = (LeakAwareDataBuffer) o;
return this.delegate.equals(other.delegate);
}
else {
return false;
}
}
@Override
public int hashCode() {
return this.delegate.hashCode();
}
@Override
public String toString() {
return String.format("LeakAwareDataBuffer (%s)", this.delegate);
return String.format("LeakAwareDataBuffer (%s)", dataBuffer());
}
}
......@@ -130,7 +130,7 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory {
public DataBuffer join(List<? extends DataBuffer> dataBuffers) {
// Remove LeakAwareDataBuffer wrapper so delegate can find native buffers
dataBuffers = dataBuffers.stream()
.map(o -> o instanceof LeakAwareDataBuffer ? ((LeakAwareDataBuffer) o).getDelegate() : o)
.map(o -> o instanceof LeakAwareDataBuffer ? ((LeakAwareDataBuffer) o).dataBuffer() : o)
.collect(Collectors.toList());
return new LeakAwareDataBuffer(this.delegate.join(dataBuffers), this);
}
......
......@@ -17,15 +17,11 @@
package org.springframework.http.server.reactive;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntPredicate;
import javax.net.ssl.SSLSession;
import io.undertow.connector.ByteBufferPool;
......@@ -38,6 +34,7 @@ import reactor.core.publisher.Flux;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DataBufferWrapper;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
......@@ -206,24 +203,23 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
}
private static class UndertowDataBuffer implements PooledDataBuffer {
private final DataBuffer dataBuffer;
private static class UndertowDataBuffer extends DataBufferWrapper implements PooledDataBuffer {
private final PooledByteBuffer pooledByteBuffer;
private final AtomicInteger refCount;
public UndertowDataBuffer(DataBuffer dataBuffer, PooledByteBuffer pooledByteBuffer) {
this.dataBuffer = dataBuffer;
super(dataBuffer);
this.pooledByteBuffer = pooledByteBuffer;
this.refCount = new AtomicInteger(1);
}
private UndertowDataBuffer(DataBuffer dataBuffer, PooledByteBuffer pooledByteBuffer,
AtomicInteger refCount) {
super(dataBuffer);
this.refCount = refCount;
this.dataBuffer = dataBuffer;
this.pooledByteBuffer = pooledByteBuffer;
}
......@@ -235,7 +231,7 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
@Override
public PooledDataBuffer retain() {
this.refCount.incrementAndGet();
DataBufferUtils.retain(this.dataBuffer);
DataBufferUtils.retain(dataBuffer());
return this;
}
......@@ -244,7 +240,7 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
int refCount = this.refCount.decrementAndGet();
if (refCount == 0) {
try {
return DataBufferUtils.release(this.dataBuffer);
return DataBufferUtils.release(dataBuffer());
}
finally {
this.pooledByteBuffer.close();
......@@ -253,157 +249,12 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
return false;
}
@Override
public DataBufferFactory factory() {
return this.dataBuffer.factory();
}
@Override
public int indexOf(IntPredicate predicate, int fromIndex) {
return this.dataBuffer.indexOf(predicate, fromIndex);
}
@Override
public int lastIndexOf(IntPredicate predicate, int fromIndex) {
return this.dataBuffer.lastIndexOf(predicate, fromIndex);
}
@Override
public int readableByteCount() {
return this.dataBuffer.readableByteCount();
}
@Override
public int writableByteCount() {
return this.dataBuffer.writableByteCount();
}
@Override
public int readPosition() {
return this.dataBuffer.readPosition();
}
@Override
public DataBuffer readPosition(int readPosition) {
return this.dataBuffer.readPosition(readPosition);
}
@Override
public int writePosition() {
return this.dataBuffer.writePosition();
}
@Override
public DataBuffer writePosition(int writePosition) {
this.dataBuffer.writePosition(writePosition);
return this;
}
@Override
public int capacity() {
return this.dataBuffer.capacity();
}
@Override
public DataBuffer capacity(int newCapacity) {
this.dataBuffer.capacity(newCapacity);
return this;
}
@Override
public DataBuffer ensureCapacity(int capacity) {
this.dataBuffer.ensureCapacity(capacity);
return this;
}
@Override
public byte getByte(int index) {
return this.dataBuffer.getByte(index);
}
@Override
public byte read() {
return this.dataBuffer.read();
}
@Override
public DataBuffer read(byte[] destination) {
this.dataBuffer.read(destination);
return this;
}
@Override
public DataBuffer read(byte[] destination, int offset, int length) {
this.dataBuffer.read(destination, offset, length);
return this;
}
@Override
public DataBuffer write(byte b) {
this.dataBuffer.write(b);
return this;
}
@Override
public DataBuffer write(byte[] source) {
this.dataBuffer.write(source);
return this;
}
@Override
public DataBuffer write(byte[] source, int offset, int length) {
this.dataBuffer.write(source, offset, length);
return this;
}
@Override
public DataBuffer write(DataBuffer... buffers) {
this.dataBuffer.write(buffers);
return this;
}
@Override
public DataBuffer write(ByteBuffer... byteBuffers) {
this.dataBuffer.write(byteBuffers);
return this;
}
@Override
public DataBuffer write(CharSequence charSequence, Charset charset) {
this.dataBuffer.write(charSequence, charset);
return this;
}
@Override
public DataBuffer slice(int index, int length) {
DataBuffer slice = this.dataBuffer.slice(index, length);
DataBuffer slice = dataBuffer().slice(index, length);
return new UndertowDataBuffer(slice, this.pooledByteBuffer, this.refCount);
}
@Override
public ByteBuffer asByteBuffer() {
return this.dataBuffer.asByteBuffer();
}
@Override
public ByteBuffer asByteBuffer(int index, int length) {
return this.dataBuffer.asByteBuffer(index, length);
}
@Override
public InputStream asInputStream() {
return this.dataBuffer.asInputStream();
}
@Override
public InputStream asInputStream(boolean releaseOnClose) {
return this.dataBuffer.asInputStream(releaseOnClose);
}
@Override
public OutputStream asOutputStream() {
return this.dataBuffer.asOutputStream();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册