提交 c5ca56bd 编写于 作者: R Rossen Stoyanchev

Polish

上级 05830912
......@@ -231,7 +231,7 @@ public class ExchangeResult {
return body
.map(bytes -> {
if (contentType == null) {
return "Unknown content type (" + bytes.length + " bytes)";
return bytes.length + " bytes of content (unknown content-type).";
}
Charset charset = contentType.getCharset();
if (charset != null) {
......@@ -240,7 +240,7 @@ public class ExchangeResult {
if (PRINTABLE_MEDIA_TYPES.stream().anyMatch(contentType::isCompatibleWith)) {
return new String(bytes, StandardCharsets.UTF_8);
}
return "Unknown charset (" + bytes.length + " bytes)";
return bytes.length + " bytes of content.";
})
.defaultIfEmpty("No content")
.onErrorResume(ex -> Mono.just("Failed to obtain content: " + ex.getMessage()))
......
......@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
......@@ -115,10 +114,8 @@ class WiretapConnector implements ClientHttpConnector {
public ExchangeResult createExchangeResult(Duration timeout, @Nullable String uriTemplate) {
return new ExchangeResult(this.request, this.response,
Mono.defer(() -> this.request.getRecorder().getContent()),
Mono.defer(() -> this.response.getRecorder().getContent()),
timeout, uriTemplate);
return new ExchangeResult(this.request, this.response, this.request.getRecorder().getContent(),
this.response.getRecorder().getContent(), timeout, uriTemplate);
}
}
......@@ -137,11 +134,11 @@ class WiretapConnector implements ClientHttpConnector {
@Nullable
private final Flux<? extends Publisher<? extends DataBuffer>> publisherNested;
private final DataBuffer buffer;
private final DataBuffer buffer = bufferFactory.allocateBuffer();
private final MonoProcessor<byte[]> content;
private final MonoProcessor<byte[]> content = MonoProcessor.create();
private volatile boolean subscriberRegistered;
private boolean hasContentConsumer;
public WiretapRecorder(@Nullable Publisher<? extends DataBuffer> publisher,
......@@ -153,22 +150,23 @@ class WiretapConnector implements ClientHttpConnector {
this.publisher = publisher != null ?
Flux.from(publisher)
.doOnSubscribe(this::handleOnSubscribe)
.doOnNext(this::handleOnNext)
.doOnSubscribe(s -> this.hasContentConsumer = true)
.doOnNext(this.buffer::write)
.doOnError(this::handleOnError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete) : null;
this.publisherNested = publisherNested != null ?
Flux.from(publisherNested)
.doOnSubscribe(this::handleOnSubscribe)
.map(p -> Flux.from(p).doOnNext(this::handleOnNext).doOnError(this::handleOnError))
.doOnSubscribe(s -> this.hasContentConsumer = true)
.map(p -> Flux.from(p).doOnNext(this.buffer::write).doOnError(this::handleOnError))
.doOnError(this::handleOnError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete) : null;
this.buffer = bufferFactory.allocateBuffer();
this.content = MonoProcessor.create();
if (publisher == null && publisherNested == null) {
this.content.onComplete();
}
}
......@@ -183,39 +181,26 @@ class WiretapConnector implements ClientHttpConnector {
}
public Mono<byte[]> getContent() {
// No publisher (e.g. request#setComplete)
if (this.publisher == null && this.publisherNested == null) {
return Mono.empty();
}
if (this.content.isTerminated()) {
return this.content;
}
if (this.subscriberRegistered) {
return Mono.error(new IllegalStateException(
"Subscriber registered but content is not yet fully consumed."));
}
else {
// No subscriber, e.g.:
// - mock server request body never consumed (error before read)
// - FluxExchangeResult#getResponseBodyContent called
(this.publisher != null ? this.publisher : this.publisherNested)
.onErrorMap(ex -> new IllegalStateException(
"Content was not been consumed and " +
"an error was raised on attempt to produce it:", ex))
.subscribe();
return Mono.defer(() -> {
if (this.content.isTerminated()) {
return this.content;
}
if (!this.hasContentConsumer) {
// Couple of possible cases:
// 1. Mock server never consumed request body (e.g. error before read)
// 2. FluxExchangeResult: getResponseBodyContent called before getResponseBody
//noinspection ConstantConditions
(this.publisher != null ? this.publisher : this.publisherNested)
.onErrorMap(ex -> new IllegalStateException(
"Content was not been consumed and " +
"an error was raised on attempt to produce it:", ex))
.subscribe();
}
return this.content;
}
});
}
private void handleOnSubscribe(Subscription subscription) {
this.subscriberRegistered = true;
}
private void handleOnNext(DataBuffer nextBuffer) {
this.buffer.write(nextBuffer);
}
private void handleOnError(Throwable ex) {
if (!this.content.isTerminated()) {
this.content.onError(ex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册