提交 5095ec40 编写于 作者: R Rossen Stoyanchev

takeUntilByteCount actually uses takeUntil

Issue: SPR-17188
上级 542ed81d
......@@ -396,14 +396,11 @@ public abstract class DataBufferUtils {
AtomicLong countDown = new AtomicLong(maxByteCount);
return Flux.from(publisher)
.takeWhile(buffer -> {
int delta = -buffer.readableByteCount();
return countDown.getAndAdd(delta) >= 0;
})
.map(buffer -> {
long count = countDown.get();
long count = countDown.addAndGet(-buffer.readableByteCount());
return count >= 0 ? buffer : buffer.slice(0, buffer.readableByteCount() + (int) count);
});
})
.takeUntil(buffer -> countDown.get() <= 0);
}
/**
......
......@@ -226,19 +226,32 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
@Test
public void takeUntilByteCount() {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
DataBuffer baz = stringBuffer("baz");
Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
Flux<DataBuffer> result = DataBufferUtils.takeUntilByteCount(flux, 5L);
Flux<DataBuffer> result = DataBufferUtils.takeUntilByteCount(
Flux.just(stringBuffer("foo"), stringBuffer("bar")), 5L);
StepVerifier.create(result)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("ba"))
.expectComplete()
.verify(Duration.ofSeconds(5));
}
@Test
public void takeUntilByteCountExact() {
DataBuffer extraBuffer = stringBuffer("baz");
Flux<DataBuffer> result = DataBufferUtils.takeUntilByteCount(
Flux.just(stringBuffer("foo"), stringBuffer("bar"), extraBuffer), 6L);
StepVerifier.create(result)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.expectComplete()
.verify(Duration.ofSeconds(5));
release(baz);
release(extraBuffer);
}
@Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册