From f747ba282aea2f7c542e4a59ef6b30f9016325c5 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Mon, 6 May 2019 16:32:47 +0200 Subject: [PATCH] Add DataBufferUtils.matcher and split Added two methods to DataBufferUtils: * matcher(byte[]), which returns a Matcher object that can be used to find a delimiter in a data buffer. * split(Publisher, byte[] delimiter), which splits a given stream of data buffers around a given delimiter. --- .../core/io/buffer/DataBufferUtils.java | 187 ++++++++++++++++++ .../core/io/buffer/DataBufferUtilsTests.java | 140 ++++++++++++- 2 files changed, 324 insertions(+), 3 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 96dd72abcd..60fd351e10 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -28,6 +28,9 @@ import java.nio.channels.CompletionHandler; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -57,6 +60,8 @@ public abstract class DataBufferUtils { private static final Consumer RELEASE_CONSUMER = DataBufferUtils::release; + private static final DataBuffer END_FRAME = new DefaultDataBufferFactory().wrap(new byte[0]); + //--------------------------------------------------------------------- // Reading @@ -450,6 +455,123 @@ public abstract class DataBufferUtils { .filter(list -> !list.isEmpty()) .map(list -> list.get(0).factory().join(list)) .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + } + + /** + * Return a {@link Matcher} for the given delimiters. The matcher can be used to find the + * delimiters in data buffers. + * @param delimiter the delimiter bytes to find + * @return the matcher + * @since 5.2 + */ + public static Matcher matcher(byte[] delimiter) { + Assert.isTrue(delimiter.length > 0, "Delimiter must not be empty"); + return new KnuthMorrisPrattMatcher(delimiter); + } + + /** + * Splits the given stream of data buffers around the given delimiter. + * The returned flux contains data buffers that are terminated by the given delimiter, + * though the delimiter itself is removed. + * @param dataBuffers the input stream of data buffers + * @param delimiter the delimiting byte array + * @return the flux of data buffers created by splitting the given data buffers around the + * given delimiter + * @since 5.2 + */ + public static Flux split(Publisher dataBuffers, byte[] delimiter) { + return split(dataBuffers, delimiter, true); + } + + /** + * Splits the given stream of data buffers around the given delimiter. + * The returned flux contains data buffers that are terminated by the given delimiter, + * which is included when {@code stripDelimiter} is {@code true}, or stripped off when + * {@code false}. + * @param dataBuffers the input stream of data buffers + * @param delimiter the delimiting byte array + * @param stripDelimiter whether to include the delimiter at the end of each resulting buffer + * @return the flux of data buffers created by splitting the given data buffers around the + * given delimiter + * @since 5.2 + */ + public static Flux split(Publisher dataBuffers, byte[] delimiter, + boolean stripDelimiter) { + Assert.notNull(dataBuffers, "DataBuffers must not be null"); + Assert.isTrue(delimiter.length > 0, "Delimiter must not be empty"); + + Matcher matcher = matcher(delimiter); + return Flux.from(dataBuffers) + .flatMap(buffer -> endFrameOnDelimiter(buffer, matcher)) + .bufferUntil(buffer -> buffer == END_FRAME) + .map(buffers -> joinAndStrip(buffers, delimiter, stripDelimiter)) + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + } + + // Return Flux, because returning List (w/ flatMapIterable) results in memory leaks because + // of pre-fetching. + private static Flux endFrameOnDelimiter(DataBuffer dataBuffer, Matcher matcher) { + List result = new ArrayList<>(); + do { + int endIdx = matcher.match(dataBuffer); + int readPosition = dataBuffer.readPosition(); + if (endIdx != -1) { + int length = endIdx + 1 - readPosition ; + result.add(dataBuffer.retainedSlice(readPosition, length)); + result.add(END_FRAME); + dataBuffer.readPosition(endIdx + 1); + } + else { + result.add(retain(dataBuffer)); + break; + } + } + while (dataBuffer.readableByteCount() > 0); + + DataBufferUtils.release(dataBuffer); + return Flux.fromIterable(result); + } + + private static DataBuffer joinAndStrip(List dataBuffers, byte[] delimiter, + boolean stripDelimiter) { + + Assert.state(!dataBuffers.isEmpty(), "DataBuffers should not be empty"); + + boolean endFrameFound = false; + int lastIdx = dataBuffers.size() - 1; + if (dataBuffers.get(lastIdx) == END_FRAME) { + endFrameFound = true; + dataBuffers.remove(lastIdx); + } + + DataBuffer result = dataBuffers.get(0).factory().join(dataBuffers); + if (stripDelimiter && endFrameFound) { + result.writePosition(result.writePosition() - delimiter.length); + } + return result; + } + + + /** + * Defines an object that matches a data buffer against a delimiter. + * @since 5.2 + * @see #match(DataBuffer) + */ + public interface Matcher { + + /** + * Returns the position of the final matching delimiter byte that matches the given buffer, + * or {@code -1} if not found. + * @param dataBuffer the buffer in which to search for the delimiter + * @return the position of the final matching delimiter, or {@code -1} if not found. + */ + int match(DataBuffer dataBuffer); + + /** + * Return the delimiter used for this matcher. + * @return the delimiter + */ + byte[] delimiter(); } @@ -696,4 +818,69 @@ public abstract class DataBufferUtils { } } + /** + * Implementation of {@link Matcher} that uses the Knuth-Morris-Pratt algorithm. + * + * @see Knuth-Morris-Pratt string matching + */ + private static class KnuthMorrisPrattMatcher implements Matcher { + + private final byte[] delimiter; + + private final int[] table; + + private int matches = 0; + + + public KnuthMorrisPrattMatcher(byte[] delimiter) { + this.delimiter = Arrays.copyOf(delimiter, delimiter.length); + this.table = longestSuffixPrefixTable(delimiter); + } + + private static int[] longestSuffixPrefixTable(byte[] delimiter) { + int[] result = new int[delimiter.length]; + result[0] = 0; + for (int i = 1; i < delimiter.length; i++) { + int j = result[i - 1]; + while (j > 0 && delimiter[i] != delimiter[j]) { + j = result[j - 1]; + } + if (delimiter[i] == delimiter[j]) { + j++; + } + result[i] = j; + } + return result; + } + + @Override + public int match(DataBuffer dataBuffer) { + for (int i = dataBuffer.readPosition(); i < dataBuffer.writePosition(); i++) { + byte b = dataBuffer.getByte(i); + + while (this.matches > 0 && b != this.delimiter[this.matches]) { + this.matches = this.table[this.matches - 1]; + } + + if (b == this.delimiter[this.matches]) { + this.matches++; + if (this.matches == this.delimiter.length) { + this.matches = 0; + return i; + } + } + } + return -1; + } + + public void reset() { + this.matches = 0; + } + + @Override + public byte[] delimiter() { + return Arrays.copyOf(this.delimiter, this.delimiter.length); + } + } + } diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 115d0fce6b..ec76af56af 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -49,9 +49,7 @@ import org.springframework.core.io.Resource; import org.springframework.core.io.buffer.support.DataBufferTestUtils; import static org.junit.Assert.*; -import static org.mockito.ArgumentMatchers.*; -import static org.mockito.Mockito.anyLong; -import static org.mockito.Mockito.isA; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; /** @@ -750,6 +748,142 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { .verify(); } + @Test + public void matcher() { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + + byte[] delims = "ooba".getBytes(StandardCharsets.UTF_8); + DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delims); + int result = matcher.match(foo); + assertEquals(-1, result); + result = matcher.match(bar); + assertEquals(1, result); + + + release(foo, bar); + } + + @Test + public void matcher2() { + DataBuffer foo = stringBuffer("fooobar"); + + byte[] delims = "oo".getBytes(StandardCharsets.UTF_8); + DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delims); + int result = matcher.match(foo); + assertEquals(2, result); + foo.readPosition(2); + result = matcher.match(foo); + assertEquals(3, result); + foo.readPosition(3); + result = matcher.match(foo); + assertEquals(-1, result); + + release(foo); + } + + @Test + public void split() { + Mono source = + deferStringBuffer("--foo--bar--baz--"); + + byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8); + + Flux result = DataBufferUtils.split(source, delimiter); + + StepVerifier.create(result) + .consumeNextWith(stringConsumer("")) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .consumeNextWith(stringConsumer("baz")) + .verifyComplete(); + } + + @Test + public void splitIncludeDelimiter() { + Mono source = + deferStringBuffer("--foo--bar--baz--"); + + byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8); + + Flux result = DataBufferUtils.split(source, delimiter, false); + + StepVerifier.create(result) + .consumeNextWith(stringConsumer("--")) + .consumeNextWith(stringConsumer("foo--")) + .consumeNextWith(stringConsumer("bar--")) + .consumeNextWith(stringConsumer("baz--")) + .verifyComplete(); + } + + @Test + public void splitErrors() { + Flux source = Flux.concat( + deferStringBuffer("foo--"), + deferStringBuffer("bar--"), + Mono.error(new RuntimeException()) + ); + byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8); + + Flux result = DataBufferUtils.split(source, delimiter); + + StepVerifier.create(result) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .expectError(RuntimeException.class) + .verify(); + } + + @Test + public void splitCanceled() { + Flux source = Flux.concat( + deferStringBuffer("foo--"), + deferStringBuffer("bar--"), + deferStringBuffer("baz") + ); + byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8); + + Flux result = DataBufferUtils.split(source, delimiter); + + StepVerifier.create(result) + .thenCancel() + .verify(); + } + + + @Test + public void splitWithoutDemand() { + Flux source = Flux.concat( + deferStringBuffer("foo--"), + deferStringBuffer("bar--") + ); + byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8); + + Flux result = DataBufferUtils.split(source, delimiter); + + BaseSubscriber subscriber = new ZeroDemandSubscriber(); + result.subscribe(subscriber); + subscriber.cancel(); + } + + @Test + public void splitAcrossBuffer() { + Flux source = Flux.concat( + deferStringBuffer("foo-"), + deferStringBuffer("-bar-"), + deferStringBuffer("-baz")); + + byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8); + + Flux result = DataBufferUtils.split(source, delimiter); + + StepVerifier.create(result) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .consumeNextWith(stringConsumer("baz")) + .verifyComplete(); + } + private static class ZeroDemandSubscriber extends BaseSubscriber { -- GitLab