diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 83f39f5d8181f32a1521a5656fb5ae081e1a2892..a9e90888622ecdebf4c9c2f350ab7fc468786e2c 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -27,21 +27,17 @@ import java.nio.channels.Channels; import java.nio.channels.CompletionHandler; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; -import java.nio.charset.Charset; import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import java.util.function.IntPredicate; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; @@ -572,147 +568,6 @@ public abstract class DataBufferUtils { } } - /** - * Splits the given stream of data buffers around the given delimiter. - * The returned flux contains data buffers that are terminated by the given delimiter, - * though the delimiter itself is removed. - * @param dataBuffers the input stream of data buffers - * @param delimiter the delimiting byte array - * @return the flux of data buffers created by splitting the given data buffers around the - * given delimiter - * @since 5.2 - */ - public static Flux split(Publisher dataBuffers, byte[] delimiter) { - return split(dataBuffers, delimiter, true); - } - - /** - * Splits the given stream of data buffers around the given delimiter. - * The returned flux contains data buffers that are terminated by the given delimiter, - * which is included when {@code stripDelimiter} is {@code false}. - * @param dataBuffers the input stream of data buffers - * @param delimiter the delimiter bytes - * @param stripDelimiter whether to include the delimiter at the end of each resulting buffer - * @return the flux of data buffers created by splitting the given data buffers around the - * given delimiter - * @since 5.2 - */ - public static Flux split(Publisher dataBuffers, byte[] delimiter, - boolean stripDelimiter) { - - return split(dataBuffers, new byte[][]{delimiter}, stripDelimiter); - } - - /** - * Splits the given stream of data buffers around the given delimiters. - * The returned flux contains data buffers that are terminated by any of the given delimiters, - * which are included when {@code stripDelimiter} is {@code false}. - * @param dataBuffers the input stream of data buffers - * @param delimiters the delimiters, one per element - * @param stripDelimiter whether to include the delimiters at the end of each resulting buffer - * @return the flux of data buffers created by splitting the given data buffers around the - * given delimiters - * @since 5.2 - */ - public static Flux split(Publisher dataBuffers, byte[][] delimiters, - boolean stripDelimiter) { - Assert.notNull(dataBuffers, "DataBuffers must not be null"); - Assert.isTrue(delimiters.length > 0, "Delimiter must not be empty"); - - Matcher[] matchers = matchers(delimiters); - - return Flux.from(dataBuffers) - .flatMap(buffer -> endFrameAfterDelimiter(buffer, matchers)) - .bufferUntil(buffer -> buffer instanceof EndFrameBuffer) - .map(buffers -> joinAndStrip(buffers, stripDelimiter)) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); - } - - private static Matcher[] matchers(byte[][] delimiters) { - Assert.isTrue(delimiters.length > 0, "Delimiters must not be empty"); - Matcher[] result = new Matcher[delimiters.length]; - for (int i = 0; i < delimiters.length; i++) { - result[i] = matcher(delimiters[i]); - } - return result; - } - - /** - * Finds the {@link Matcher} with the first match and longest delimiter, and inserts a - * {@link EndFrameBuffer} just after its match. - * - * @param dataBuffer the buffer to find delimiters in - * @param matchers used to find the first delimiters - * @return a flux of buffers, containing {@link EndFrameBuffer} after each delimiter that was - * found in {@code dataBuffer}. Returns Flux, because returning List (w/ flatMapIterable) - * results in memory leaks due to pre-fetching. - */ - private static Flux endFrameAfterDelimiter(DataBuffer dataBuffer, Matcher[] matchers) { - List result = new ArrayList<>(); - do { - int matchedEndIdx = Integer.MAX_VALUE; - byte[] matchedDelimiter = new byte[0]; - for (Matcher matcher : matchers) { - int endIdx = matcher.match(dataBuffer); - if (endIdx != -1 && - endIdx <= matchedEndIdx && - matcher.delimiter().length > matchedDelimiter.length) { - matchedEndIdx = endIdx; - matchedDelimiter = matcher.delimiter(); - } - } - if (matchedDelimiter.length > 0) { - int readPosition = dataBuffer.readPosition(); - int length = matchedEndIdx + 1 - readPosition ; - result.add(dataBuffer.retainedSlice(readPosition, length)); - result.add(new EndFrameBuffer(matchedDelimiter)); - dataBuffer.readPosition(matchedEndIdx + 1); - - for (Matcher matcher : matchers) { - matcher.reset(); - } - } - else { - result.add(retain(dataBuffer)); - break; - } - } - while (dataBuffer.readableByteCount() > 0); - - DataBufferUtils.release(dataBuffer); - return Flux.fromIterable(result); - } - - /** - * Joins the given list of buffers. If the list ends with a {@link EndFrameBuffer}, it is - * removed. If {@code stripDelimiter} is {@code true} and the resulting buffer ends with - * a delimiter, it is removed. - * @param dataBuffers the data buffers to join - * @param stripDelimiter whether to strip the delimiter - * @return the joined buffer - */ - private static DataBuffer joinAndStrip(List dataBuffers, - boolean stripDelimiter) { - - Assert.state(!dataBuffers.isEmpty(), "DataBuffers should not be empty"); - - byte[] matchingDelimiter = null; - - int lastIdx = dataBuffers.size() - 1; - DataBuffer lastBuffer = dataBuffers.get(lastIdx); - if (lastBuffer instanceof EndFrameBuffer) { - matchingDelimiter = ((EndFrameBuffer) lastBuffer).delimiter(); - dataBuffers.remove(lastIdx); - } - - DataBuffer result = dataBuffers.get(0).factory().join(dataBuffers); - - if (stripDelimiter && matchingDelimiter != null) { - result.writePosition(result.writePosition() - matchingDelimiter.length); - } - return result; - } - /** * Defines an object that matches a data buffer against a delimiter. @@ -1107,167 +962,4 @@ public abstract class DataBufferUtils { } } - - - private static class EndFrameBuffer implements DataBuffer { - - private static final DataBuffer BUFFER = new DefaultDataBufferFactory().wrap(new byte[0]); - - private byte[] delimiter; - - - public EndFrameBuffer(byte[] delimiter) { - this.delimiter = delimiter; - } - - public byte[] delimiter() { - return this.delimiter; - } - - @Override - public DataBufferFactory factory() { - return BUFFER.factory(); - } - - @Override - public int indexOf(IntPredicate predicate, int fromIndex) { - return BUFFER.indexOf(predicate, fromIndex); - } - - @Override - public int lastIndexOf(IntPredicate predicate, int fromIndex) { - return BUFFER.lastIndexOf(predicate, fromIndex); - } - - @Override - public int readableByteCount() { - return BUFFER.readableByteCount(); - } - - @Override - public int writableByteCount() { - return BUFFER.writableByteCount(); - } - - @Override - public int capacity() { - return BUFFER.capacity(); - } - - @Override - public DataBuffer capacity(int capacity) { - return BUFFER.capacity(capacity); - } - - @Override - public DataBuffer ensureCapacity(int capacity) { - return BUFFER.ensureCapacity(capacity); - } - - @Override - public int readPosition() { - return BUFFER.readPosition(); - } - - @Override - public DataBuffer readPosition(int readPosition) { - return BUFFER.readPosition(readPosition); - } - - @Override - public int writePosition() { - return BUFFER.writePosition(); - } - - @Override - public DataBuffer writePosition(int writePosition) { - return BUFFER.writePosition(writePosition); - } - - @Override - public byte getByte(int index) { - return BUFFER.getByte(index); - } - - @Override - public byte read() { - return BUFFER.read(); - } - - @Override - public DataBuffer read(byte[] destination) { - return BUFFER.read(destination); - } - - @Override - public DataBuffer read(byte[] destination, int offset, int length) { - return BUFFER.read(destination, offset, length); - } - - @Override - public DataBuffer write(byte b) { - return BUFFER.write(b); - } - - @Override - public DataBuffer write(byte[] source) { - return BUFFER.write(source); - } - - @Override - public DataBuffer write(byte[] source, int offset, int length) { - return BUFFER.write(source, offset, length); - } - - @Override - public DataBuffer write(DataBuffer... buffers) { - return BUFFER.write(buffers); - } - - @Override - public DataBuffer write(ByteBuffer... buffers) { - return BUFFER.write(buffers); - } - - @Override - public DataBuffer write(CharSequence charSequence, Charset charset) { - return BUFFER.write(charSequence, charset); - } - - @Override - public DataBuffer slice(int index, int length) { - return BUFFER.slice(index, length); - } - - @Override - public DataBuffer retainedSlice(int index, int length) { - return BUFFER.retainedSlice(index, length); - } - - @Override - public ByteBuffer asByteBuffer() { - return BUFFER.asByteBuffer(); - } - - @Override - public ByteBuffer asByteBuffer(int index, int length) { - return BUFFER.asByteBuffer(index, length); - } - - @Override - public InputStream asInputStream() { - return BUFFER.asInputStream(); - } - - @Override - public InputStream asInputStream(boolean releaseOnClose) { - return BUFFER.asInputStream(releaseOnClose); - } - - @Override - public OutputStream asOutputStream() { - return BUFFER.asOutputStream(); - } - } - } diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 0e6db5903fe153d57069cd8e57317e49ef212e7c..15d2de4b3aac100d209468806a8ffb4f055131d6 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -809,127 +809,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { release(foo); } - @Test - public void split() { - Mono source = - deferStringBuffer("--foo--bar--baz--"); - - byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8); - - Flux result = DataBufferUtils.split(source, delimiter); - - StepVerifier.create(result) - .consumeNextWith(stringConsumer("")) - .consumeNextWith(stringConsumer("foo")) - .consumeNextWith(stringConsumer("bar")) - .consumeNextWith(stringConsumer("baz")) - .verifyComplete(); - } - - @Test - public void splitIncludeDelimiter() { - Mono source = - deferStringBuffer("--foo--bar--baz--"); - - byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8); - - Flux result = DataBufferUtils.split(source, delimiter, false); - - StepVerifier.create(result) - .consumeNextWith(stringConsumer("--")) - .consumeNextWith(stringConsumer("foo--")) - .consumeNextWith(stringConsumer("bar--")) - .consumeNextWith(stringConsumer("baz--")) - .verifyComplete(); - } - - @Test - public void splitMultipleDelimiters() { - Mono source = - deferStringBuffer("foo␤bar␍␤baz␤"); - - byte[][] delimiters = new byte[][]{ - "␤".getBytes(StandardCharsets.UTF_8), - "␍␤".getBytes(StandardCharsets.UTF_8) - }; - - Flux result = DataBufferUtils.split(source, delimiters, false); - - StepVerifier.create(result) - .consumeNextWith(stringConsumer("foo␤")) - .consumeNextWith(stringConsumer("bar␍␤")) - .consumeNextWith(stringConsumer("baz␤")) - .verifyComplete(); - } - - @Test - public void splitErrors() { - Flux source = Flux.concat( - deferStringBuffer("foo--"), - deferStringBuffer("bar--"), - Mono.error(new RuntimeException()) - ); - byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8); - - Flux result = DataBufferUtils.split(source, delimiter); - - StepVerifier.create(result) - .consumeNextWith(stringConsumer("foo")) - .consumeNextWith(stringConsumer("bar")) - .expectError(RuntimeException.class) - .verify(); - } - - @Test - public void splitCanceled() { - Flux source = Flux.concat( - deferStringBuffer("foo--"), - deferStringBuffer("bar--"), - deferStringBuffer("baz") - ); - byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8); - - Flux result = DataBufferUtils.split(source, delimiter); - - StepVerifier.create(result) - .thenCancel() - .verify(); - } - - - @Test - public void splitWithoutDemand() { - Flux source = Flux.concat( - deferStringBuffer("foo--"), - deferStringBuffer("bar--") - ); - byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8); - - Flux result = DataBufferUtils.split(source, delimiter); - - BaseSubscriber subscriber = new ZeroDemandSubscriber(); - result.subscribe(subscriber); - subscriber.cancel(); - } - - @Test - public void splitAcrossBuffer() { - Flux source = Flux.concat( - deferStringBuffer("foo-"), - deferStringBuffer("-bar-"), - deferStringBuffer("-baz")); - - byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8); - - Flux result = DataBufferUtils.split(source, delimiter); - - StepVerifier.create(result) - .consumeNextWith(stringConsumer("foo")) - .consumeNextWith(stringConsumer("bar")) - .consumeNextWith(stringConsumer("baz")) - .verifyComplete(); - } - private static class ZeroDemandSubscriber extends BaseSubscriber {