提交 f747ba28 编写于 作者: A Arjen Poutsma

Add DataBufferUtils.matcher and split

Added two methods to DataBufferUtils:

* matcher(byte[]), which returns a Matcher object that can be used to
  find a delimiter in a data buffer.
* split(Publisher<DataBuffer>, byte[] delimiter), which splits a given
  stream of data buffers around a given delimiter.
上级 b74c09d1
...@@ -28,6 +28,9 @@ import java.nio.channels.CompletionHandler; ...@@ -28,6 +28,9 @@ import java.nio.channels.CompletionHandler;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
...@@ -57,6 +60,8 @@ public abstract class DataBufferUtils { ...@@ -57,6 +60,8 @@ public abstract class DataBufferUtils {
private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release; private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release;
private static final DataBuffer END_FRAME = new DefaultDataBufferFactory().wrap(new byte[0]);
//--------------------------------------------------------------------- //---------------------------------------------------------------------
// Reading // Reading
...@@ -450,6 +455,123 @@ public abstract class DataBufferUtils { ...@@ -450,6 +455,123 @@ public abstract class DataBufferUtils {
.filter(list -> !list.isEmpty()) .filter(list -> !list.isEmpty())
.map(list -> list.get(0).factory().join(list)) .map(list -> list.get(0).factory().join(list))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
/**
* Return a {@link Matcher} for the given delimiters. The matcher can be used to find the
* delimiters in data buffers.
* @param delimiter the delimiter bytes to find
* @return the matcher
* @since 5.2
*/
public static Matcher matcher(byte[] delimiter) {
Assert.isTrue(delimiter.length > 0, "Delimiter must not be empty");
return new KnuthMorrisPrattMatcher(delimiter);
}
/**
* Splits the given stream of data buffers around the given delimiter.
* The returned flux contains data buffers that are terminated by the given delimiter,
* though the delimiter itself is removed.
* @param dataBuffers the input stream of data buffers
* @param delimiter the delimiting byte array
* @return the flux of data buffers created by splitting the given data buffers around the
* given delimiter
* @since 5.2
*/
public static Flux<DataBuffer> split(Publisher<DataBuffer> dataBuffers, byte[] delimiter) {
return split(dataBuffers, delimiter, true);
}
/**
* Splits the given stream of data buffers around the given delimiter.
* The returned flux contains data buffers that are terminated by the given delimiter,
* which is included when {@code stripDelimiter} is {@code true}, or stripped off when
* {@code false}.
* @param dataBuffers the input stream of data buffers
* @param delimiter the delimiting byte array
* @param stripDelimiter whether to include the delimiter at the end of each resulting buffer
* @return the flux of data buffers created by splitting the given data buffers around the
* given delimiter
* @since 5.2
*/
public static Flux<DataBuffer> split(Publisher<DataBuffer> dataBuffers, byte[] delimiter,
boolean stripDelimiter) {
Assert.notNull(dataBuffers, "DataBuffers must not be null");
Assert.isTrue(delimiter.length > 0, "Delimiter must not be empty");
Matcher matcher = matcher(delimiter);
return Flux.from(dataBuffers)
.flatMap(buffer -> endFrameOnDelimiter(buffer, matcher))
.bufferUntil(buffer -> buffer == END_FRAME)
.map(buffers -> joinAndStrip(buffers, delimiter, stripDelimiter))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
// Return Flux, because returning List (w/ flatMapIterable) results in memory leaks because
// of pre-fetching.
private static Flux<DataBuffer> endFrameOnDelimiter(DataBuffer dataBuffer, Matcher matcher) {
List<DataBuffer> result = new ArrayList<>();
do {
int endIdx = matcher.match(dataBuffer);
int readPosition = dataBuffer.readPosition();
if (endIdx != -1) {
int length = endIdx + 1 - readPosition ;
result.add(dataBuffer.retainedSlice(readPosition, length));
result.add(END_FRAME);
dataBuffer.readPosition(endIdx + 1);
}
else {
result.add(retain(dataBuffer));
break;
}
}
while (dataBuffer.readableByteCount() > 0);
DataBufferUtils.release(dataBuffer);
return Flux.fromIterable(result);
}
private static DataBuffer joinAndStrip(List<DataBuffer> dataBuffers, byte[] delimiter,
boolean stripDelimiter) {
Assert.state(!dataBuffers.isEmpty(), "DataBuffers should not be empty");
boolean endFrameFound = false;
int lastIdx = dataBuffers.size() - 1;
if (dataBuffers.get(lastIdx) == END_FRAME) {
endFrameFound = true;
dataBuffers.remove(lastIdx);
}
DataBuffer result = dataBuffers.get(0).factory().join(dataBuffers);
if (stripDelimiter && endFrameFound) {
result.writePosition(result.writePosition() - delimiter.length);
}
return result;
}
/**
* Defines an object that matches a data buffer against a delimiter.
* @since 5.2
* @see #match(DataBuffer)
*/
public interface Matcher {
/**
* Returns the position of the final matching delimiter byte that matches the given buffer,
* or {@code -1} if not found.
* @param dataBuffer the buffer in which to search for the delimiter
* @return the position of the final matching delimiter, or {@code -1} if not found.
*/
int match(DataBuffer dataBuffer);
/**
* Return the delimiter used for this matcher.
* @return the delimiter
*/
byte[] delimiter();
} }
...@@ -696,4 +818,69 @@ public abstract class DataBufferUtils { ...@@ -696,4 +818,69 @@ public abstract class DataBufferUtils {
} }
} }
/**
* Implementation of {@link Matcher} that uses the Knuth-Morris-Pratt algorithm.
*
* @see <a href="https://www.nayuki.io/page/knuth-morris-pratt-string-matching">Knuth-Morris-Pratt string matching</a>
*/
private static class KnuthMorrisPrattMatcher implements Matcher {
private final byte[] delimiter;
private final int[] table;
private int matches = 0;
public KnuthMorrisPrattMatcher(byte[] delimiter) {
this.delimiter = Arrays.copyOf(delimiter, delimiter.length);
this.table = longestSuffixPrefixTable(delimiter);
}
private static int[] longestSuffixPrefixTable(byte[] delimiter) {
int[] result = new int[delimiter.length];
result[0] = 0;
for (int i = 1; i < delimiter.length; i++) {
int j = result[i - 1];
while (j > 0 && delimiter[i] != delimiter[j]) {
j = result[j - 1];
}
if (delimiter[i] == delimiter[j]) {
j++;
}
result[i] = j;
}
return result;
}
@Override
public int match(DataBuffer dataBuffer) {
for (int i = dataBuffer.readPosition(); i < dataBuffer.writePosition(); i++) {
byte b = dataBuffer.getByte(i);
while (this.matches > 0 && b != this.delimiter[this.matches]) {
this.matches = this.table[this.matches - 1];
}
if (b == this.delimiter[this.matches]) {
this.matches++;
if (this.matches == this.delimiter.length) {
this.matches = 0;
return i;
}
}
}
return -1;
}
public void reset() {
this.matches = 0;
}
@Override
public byte[] delimiter() {
return Arrays.copyOf(this.delimiter, this.delimiter.length);
}
}
} }
...@@ -49,9 +49,7 @@ import org.springframework.core.io.Resource; ...@@ -49,9 +49,7 @@ import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.support.DataBufferTestUtils; import org.springframework.core.io.buffer.support.DataBufferTestUtils;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
/** /**
...@@ -750,6 +748,142 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { ...@@ -750,6 +748,142 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
.verify(); .verify();
} }
@Test
public void matcher() {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
byte[] delims = "ooba".getBytes(StandardCharsets.UTF_8);
DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delims);
int result = matcher.match(foo);
assertEquals(-1, result);
result = matcher.match(bar);
assertEquals(1, result);
release(foo, bar);
}
@Test
public void matcher2() {
DataBuffer foo = stringBuffer("fooobar");
byte[] delims = "oo".getBytes(StandardCharsets.UTF_8);
DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delims);
int result = matcher.match(foo);
assertEquals(2, result);
foo.readPosition(2);
result = matcher.match(foo);
assertEquals(3, result);
foo.readPosition(3);
result = matcher.match(foo);
assertEquals(-1, result);
release(foo);
}
@Test
public void split() {
Mono<DataBuffer> source =
deferStringBuffer("--foo--bar--baz--");
byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8);
Flux<DataBuffer> result = DataBufferUtils.split(source, delimiter);
StepVerifier.create(result)
.consumeNextWith(stringConsumer(""))
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.verifyComplete();
}
@Test
public void splitIncludeDelimiter() {
Mono<DataBuffer> source =
deferStringBuffer("--foo--bar--baz--");
byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8);
Flux<DataBuffer> result = DataBufferUtils.split(source, delimiter, false);
StepVerifier.create(result)
.consumeNextWith(stringConsumer("--"))
.consumeNextWith(stringConsumer("foo--"))
.consumeNextWith(stringConsumer("bar--"))
.consumeNextWith(stringConsumer("baz--"))
.verifyComplete();
}
@Test
public void splitErrors() {
Flux<DataBuffer> source = Flux.concat(
deferStringBuffer("foo--"),
deferStringBuffer("bar--"),
Mono.error(new RuntimeException())
);
byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8);
Flux<DataBuffer> result = DataBufferUtils.split(source, delimiter);
StepVerifier.create(result)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.expectError(RuntimeException.class)
.verify();
}
@Test
public void splitCanceled() {
Flux<DataBuffer> source = Flux.concat(
deferStringBuffer("foo--"),
deferStringBuffer("bar--"),
deferStringBuffer("baz")
);
byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8);
Flux<DataBuffer> result = DataBufferUtils.split(source, delimiter);
StepVerifier.create(result)
.thenCancel()
.verify();
}
@Test
public void splitWithoutDemand() {
Flux<DataBuffer> source = Flux.concat(
deferStringBuffer("foo--"),
deferStringBuffer("bar--")
);
byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8);
Flux<DataBuffer> result = DataBufferUtils.split(source, delimiter);
BaseSubscriber<DataBuffer> subscriber = new ZeroDemandSubscriber();
result.subscribe(subscriber);
subscriber.cancel();
}
@Test
public void splitAcrossBuffer() {
Flux<DataBuffer> source = Flux.concat(
deferStringBuffer("foo-"),
deferStringBuffer("-bar-"),
deferStringBuffer("-baz"));
byte[] delimiter = "--".getBytes(StandardCharsets.UTF_8);
Flux<DataBuffer> result = DataBufferUtils.split(source, delimiter);
StepVerifier.create(result)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.verifyComplete();
}
private static class ZeroDemandSubscriber extends BaseSubscriber<DataBuffer> { private static class ZeroDemandSubscriber extends BaseSubscriber<DataBuffer> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册