提交 4d914413 编写于 作者: S Sebastien Deleuze

Upgrade to Reactor Core 3.1

Issue: SPR-15318
上级 f9633674
......@@ -79,7 +79,7 @@ configure(allprojects) { project ->
ext.poiVersion = "3.15"
ext.protobufVersion = "3.2.0"
ext.quartzVersion = "2.2.3"
ext.reactorVersion = "Aluminium-BUILD-SNAPSHOT"
ext.reactorVersion = "Bismuth-BUILD-SNAPSHOT"
ext.romeVersion = "1.7.1"
ext.rxjavaVersion = '1.2.9'
ext.rxjavaAdapterVersion = '1.2.1'
......
......@@ -85,7 +85,7 @@ public class ResourceRegionEncoder extends AbstractEncoder<ResourceRegion> {
if (inputStream instanceof Mono) {
return ((Mono<? extends ResourceRegion>) inputStream)
.flatMap(region -> writeResourceRegion(region, bufferFactory));
.flatMapMany(region -> writeResourceRegion(region, bufferFactory));
}
else {
Assert.notNull(hints, "'hints' must not be null");
......
......@@ -114,7 +114,7 @@ public abstract class DataBufferUtils {
ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize);
return Flux.create(emitter -> {
emitter.setCancellation(() -> closeChannel(channel));
emitter.onDispose(() -> closeChannel(channel));
AsynchronousFileChannelCompletionHandler completionHandler =
new AsynchronousFileChannelCompletionHandler(emitter, position,
dataBufferFactory, byteBuffer);
......
......@@ -146,7 +146,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
.doOnNext(updateConnectMono(connectMono))
.doOnError(updateConnectMono(connectMono))
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.then(NettyContext::onClose) // post-connect issues
.flatMap(NettyContext::onClose) // post-connect issues
.retryWhen(reconnectFunction(strategy))
.repeatWhen(reconnectFunction(strategy))
.subscribe();
......
......@@ -97,7 +97,7 @@ public class FormHttpMessageWriter implements HttpMessageWriter<MultiValueMap<St
.from(inputStream)
.single()
.map(form -> generateForm(form, charset))
.then(value -> {
.flatMap(value -> {
ByteBuffer byteBuffer = charset.encode(value);
DataBuffer buffer = message.bufferFactory().wrap(byteBuffer);
message.getHeaders().setContentLength(byteBuffer.remaining());
......
......@@ -108,7 +108,7 @@ public class ResourceHttpMessageWriter implements HttpMessageWriter<Resource> {
public Mono<Void> write(Publisher<? extends Resource> inputStream, ResolvableType elementType,
MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
return Mono.from(inputStream).then(resource ->
return Mono.from(inputStream).flatMap(resource ->
writeResource(resource, elementType, mediaType, message, hints));
}
......@@ -191,7 +191,7 @@ public class ResourceHttpMessageWriter implements HttpMessageWriter<Resource> {
return response.setComplete();
}
return Mono.from(inputStream).then(resource -> {
return Mono.from(inputStream).flatMap(resource -> {
if (ranges.isEmpty()) {
return writeResource(resource, elementType, mediaType, response, hints);
......
......@@ -99,7 +99,7 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
else {
Mono<DataBuffer> singleBuffer = flux.reduce(DataBuffer::write);
return singleBuffer.
flatMap(dataBuffer -> {
flatMapMany(dataBuffer -> {
try {
InputStream is = dataBuffer.asInputStream();
XMLEventReader eventReader = inputFactory.createXMLEventReader(is);
......
......@@ -21,6 +21,7 @@ import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import reactor.core.publisher.MonoSource;
import reactor.core.publisher.Operators;
......@@ -56,7 +57,7 @@ public class ChannelSendOperator<T> extends MonoSource<T, Void> {
@SuppressWarnings("deprecation")
private class WriteWithBarrier extends Operators.SubscriberAdapter<T, Void> implements Publisher<T> {
private class WriteWithBarrier extends SubscriberAdapter<T, Void> implements Publisher<T> {
/**
* We've at at least one emission, we've called the write function, the write
......@@ -210,6 +211,134 @@ public class ChannelSendOperator<T> extends MonoSource<T, Void> {
}
}
// TODO Remove this copy of Reactor 3.0.x Operators.SubscriberAdapter
private static class SubscriberAdapter<I, O> implements Subscriber<I>, Subscription {
protected final Subscriber<? super O> subscriber;
protected Subscription subscription;
public SubscriberAdapter(Subscriber<? super O> subscriber) {
this.subscriber = subscriber;
}
public Subscriber<? super O> downstream() {
return subscriber;
}
@Override
public final void cancel() {
try {
doCancel();
} catch (Throwable throwable) {
doOnSubscriberError(Operators.onOperatorError(subscription, throwable));
}
}
@Override
public final void onComplete() {
try {
doComplete();
} catch (Throwable throwable) {
doOnSubscriberError(Operators.onOperatorError(throwable));
}
}
@Override
public final void onError(Throwable t) {
if (t == null) {
throw Exceptions.argumentIsNullException();
}
doError(t);
}
@Override
public final void onNext(I i) {
if (i == null) {
throw Exceptions.argumentIsNullException();
}
try {
doNext(i);
}
catch (Throwable throwable) {
doOnSubscriberError(Operators.onOperatorError(subscription, throwable, i));
}
}
@Override
public final void onSubscribe(Subscription s) {
if (Operators.validate(subscription, s)) {
try {
subscription = s;
doOnSubscribe(s);
}
catch (Throwable throwable) {
doOnSubscriberError(Operators.onOperatorError(s, throwable));
}
}
}
@Override
public final void request(long n) {
try {
Operators.checkRequest(n);
doRequest(n);
} catch (Throwable throwable) {
doCancel();
doOnSubscriberError(Operators.onOperatorError(throwable));
}
}
@Override
public String toString() {
return getClass().getSimpleName();
}
/**
* Hook for further processing of onSubscribe's Subscription.
* @param subscription the subscription to optionally process
*/
protected void doOnSubscribe(Subscription subscription) {
subscriber.onSubscribe(this);
}
public Subscription upstream() {
return subscription;
}
@SuppressWarnings("unchecked")
protected void doNext(I i) {
subscriber.onNext((O) i);
}
protected void doError(Throwable throwable) {
subscriber.onError(throwable);
}
protected void doOnSubscriberError(Throwable throwable){
subscriber.onError(throwable);
}
protected void doComplete() {
subscriber.onComplete();
}
protected void doRequest(long n) {
Subscription s = this.subscription;
if (s != null) {
s.request(n);
}
}
protected void doCancel() {
Subscription s = this.subscription;
if (s != null) {
this.subscription = null;
s.cancel();
}
}
}
private class DownstreamBridge implements Subscriber<Void> {
......
......@@ -72,7 +72,7 @@ public class WebExchangeDataBinder extends WebDataBinder {
.map(this::getParamsToBind)
.doOnNext(values -> values.putAll(getMultipartFiles(exchange)))
.doOnNext(values -> values.putAll(getExtraValuesToBind(exchange)))
.then(values -> {
.flatMap(values -> {
doBind(new MutablePropertyValues(values));
return Mono.empty();
});
......
......@@ -81,7 +81,7 @@ public class HiddenHttpMethodFilter implements WebFilter {
String method = formData.getFirst(this.methodParamName);
return StringUtils.hasLength(method) ? mapExchange(exchange, method) : exchange;
})
.then((exchange1) -> chain.filter(exchange1));
.flatMap((exchange1) -> chain.filter(exchange1));
}
private ServerWebExchange mapExchange(ServerWebExchange exchange, String methodParamValue) {
......
......@@ -104,7 +104,7 @@ public class DefaultWebSessionManager implements WebSessionManager {
Flux.fromIterable(getSessionIdResolver().resolveSessionIds(exchange))
.concatMap(this.sessionStore::retrieveSession)
.next()
.then(session -> validateSession(exchange, session))
.flatMap(session -> validateSession(exchange, session))
.otherwiseIfEmpty(createSession(exchange))
.map(session -> extendSession(exchange, session)));
}
......
......@@ -29,7 +29,6 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Signal;
import static org.junit.Assert.assertEquals;
......@@ -142,37 +141,41 @@ public class ChannelSendOperatorTests {
public Publisher<Void> send(Publisher<String> publisher) {
return subscriber -> {
Executors.newSingleThreadScheduledExecutor().schedule(() -> publisher.subscribe(new WriteSubscriber(subscriber)),
50, TimeUnit.MILLISECONDS);
};
return subscriber -> Executors.newSingleThreadScheduledExecutor().schedule(() ->
publisher.subscribe(new WriteSubscriber(subscriber)),50, TimeUnit.MILLISECONDS);
}
private class WriteSubscriber extends Operators.SubscriberAdapter<String, Void> {
private class WriteSubscriber implements Subscriber<String> {
private Subscription subscription;
private final Subscriber<? super Void> subscriber;
public WriteSubscriber(Subscriber<? super Void> subscriber) {
super(subscriber);
this.subscriber = subscriber;
}
@Override
protected void doOnSubscribe(Subscription subscription) {
subscription.request(1);
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void doNext(String item) {
public void onNext(String item) {
items.add(item);
this.subscription.request(1);
}
@Override
public void doError(Throwable ex) {
public void onError(Throwable ex) {
error = ex;
this.subscriber.onError(ex);
}
@Override
public void doComplete() {
public void onComplete() {
completed = true;
this.subscriber.onComplete();
}
......
......@@ -163,7 +163,7 @@ public class FilteringWebHandlerTests {
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
return doAsyncWork().then(asyncResult -> {
return doAsyncWork().flatMap(asyncResult -> {
logger.debug("Async result: " + asyncResult);
return chain.filter(exchange);
});
......
......@@ -126,8 +126,8 @@ public class DispatcherHandler implements WebHandler, ApplicationContextAware {
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
.otherwiseIfEmpty(Mono.error(HANDLER_NOT_FOUND_EXCEPTION))
.then(handler -> invokeHandler(exchange, handler))
.then(result -> handleResult(exchange, result));
.flatMap(handler -> invokeHandler(exchange, handler))
.flatMap(result -> handleResult(exchange, result));
}
private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
......@@ -141,7 +141,7 @@ public class DispatcherHandler implements WebHandler, ApplicationContextAware {
private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
return getResultHandler(result).handleResult(exchange, result)
.otherwise(ex -> result.applyExceptionHandler(ex).then(exceptionResult ->
.otherwise(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult ->
getResultHandler(exceptionResult).handleResult(exchange, exceptionResult)));
}
......
......@@ -346,17 +346,17 @@ class DefaultWebClient implements WebClient {
@Override
public <T> Mono<T> bodyToMono(Class<T> bodyType) {
return this.responseMono.then(clientResponse -> clientResponse.bodyToMono(bodyType));
return this.responseMono.flatMap(clientResponse -> clientResponse.bodyToMono(bodyType));
}
@Override
public <T> Flux<T> bodyToFlux(Class<T> elementType) {
return this.responseMono.flatMap(clientResponse -> clientResponse.bodyToFlux(elementType));
return this.responseMono.flatMapMany(clientResponse -> clientResponse.bodyToFlux(elementType));
}
@Override
public <T> Mono<ResponseEntity<T>> bodyToEntity(Class<T> bodyType) {
return this.responseMono.then(response ->
return this.responseMono.flatMap(response ->
response.bodyToMono(bodyType).map(body -> {
HttpHeaders headers = response.headers().asHttpHeaders();
return new ResponseEntity<>(body, headers, response.statusCode());
......@@ -366,7 +366,7 @@ class DefaultWebClient implements WebClient {
@Override
public <T> Mono<ResponseEntity<List<T>>> bodyToEntityList(Class<T> responseType) {
return this.responseMono.then(response ->
return this.responseMono.flatMap(response ->
response.bodyToFlux(responseType).collectList().map(body -> {
HttpHeaders headers = response.headers().asHttpHeaders();
return new ResponseEntity<>(body, headers, response.statusCode());
......
......@@ -77,7 +77,7 @@ public interface ExchangeFilterFunction {
Mono<ClientRequest>> requestProcessor) {
Assert.notNull(requestProcessor, "'requestProcessor' must not be null");
return (request, next) -> requestProcessor.apply(request).then(next::exchange);
return (request, next) -> requestProcessor.apply(request).flatMap(next::exchange);
}
/**
......@@ -90,7 +90,7 @@ public interface ExchangeFilterFunction {
Mono<ClientResponse>> responseProcessor) {
Assert.notNull(responseProcessor, "'responseProcessor' must not be null");
return (request, next) -> next.exchange(request).then(responseProcessor);
return (request, next) -> next.exchange(request).flatMap(responseProcessor);
}
}
......@@ -170,7 +170,7 @@ class DefaultRenderingResponseBuilder implements RenderingResponse.Builder {
.next()
.otherwiseIfEmpty(Mono.error(new IllegalArgumentException("Could not resolve view with name '" +
name() +"'")))
.then(view -> view.render(model(), contentType, exchange));
.flatMap(view -> view.render(model(), contentType, exchange));
}
private Locale resolveLocale(ServerWebExchange exchange, HandlerStrategies strategies) {
......
......@@ -82,7 +82,7 @@ public interface HandlerFilterFunction<T extends ServerResponse, R extends Serve
Function<ServerRequest, Mono<ServerRequest>> requestProcessor) {
Assert.notNull(requestProcessor, "'requestProcessor' must not be null");
return (request, next) -> requestProcessor.apply(request).then(next::handle);
return (request, next) -> requestProcessor.apply(request).flatMap(next::handle);
}
/**
......@@ -95,7 +95,7 @@ public interface HandlerFilterFunction<T extends ServerResponse, R extends Serve
Function<T, Mono<R>> responseProcessor) {
Assert.notNull(responseProcessor, "'responseProcessor' must not be null");
return (request, next) -> next.handle(request).then(responseProcessor);
return (request, next) -> next.handle(request).flatMap(responseProcessor);
}
......
......@@ -232,8 +232,8 @@ public abstract class RouterFunctions {
addAttributes(exchange, request);
return routerFunction.route(request)
.defaultIfEmpty(notFound())
.then(handlerFunction -> wrapException(() -> handlerFunction.handle(request)))
.then(response -> wrapException(() -> response.writeTo(exchange, strategies)))
.flatMap(handlerFunction -> wrapException(() -> handlerFunction.handle(request)))
.flatMap(response -> wrapException(() -> response.writeTo(exchange, strategies)))
.otherwise(ResponseStatusException.class,
ex -> {
exchange.getResponse().setStatusCode(ex.getStatus());
......
......@@ -106,7 +106,7 @@ public class AppCacheManifestTransformer extends ResourceTransformerSupport {
ResourceTransformerChain chain) {
return chain.transform(exchange, inputResource)
.then(resource -> {
.flatMap(resource -> {
String name = resource.getFilename();
if (!this.fileExtension.equals(StringUtils.getFilenameExtension(name))) {
return Mono.just(resource);
......@@ -124,7 +124,7 @@ public class AppCacheManifestTransformer extends ResourceTransformerSupport {
return Flux.generate(new LineGenerator(content))
.concatMap(info -> processLine(info, exchange, resource, chain))
.collect(() -> new LineAggregator(resource, content), LineAggregator::add)
.then(aggregator -> Mono.just(aggregator.createResource()));
.flatMap(aggregator -> Mono.just(aggregator.createResource()));
});
}
......
......@@ -71,7 +71,7 @@ public class CssLinkResourceTransformer extends ResourceTransformerSupport {
ResourceTransformerChain transformerChain) {
return transformerChain.transform(exchange, resource)
.then(newResource -> {
.flatMap(newResource -> {
String filename = newResource.getFilename();
if (!"css".equals(StringUtils.getFilenameExtension(filename)) ||
resource instanceof GzipResourceResolver.GzippedResource) {
......@@ -115,7 +115,7 @@ public class CssLinkResourceTransformer extends ResourceTransformerSupport {
writer.write(chunk);
return writer;
})
.then(writer -> {
.flatMap(writer -> {
byte[] newContent = writer.toString().getBytes(DEFAULT_CHARSET);
return Mono.just(new TransformedResource(resource, newContent));
});
......
......@@ -281,7 +281,7 @@ public class ResourceWebHandler
exchange.getResponse().setStatusCode(HttpStatus.NOT_FOUND);
return Mono.empty();
}))
.then(resource -> {
.flatMap(resource -> {
try {
if (HttpMethod.OPTIONS.equals(exchange.getRequest().getMethod())) {
exchange.getResponse().getHeaders().add("Allow", "GET,HEAD,OPTIONS");
......@@ -378,7 +378,7 @@ public class ResourceWebHandler
ResourceResolverChain resolveChain = createResolverChain();
return resolveChain.resolveResource(exchange, path, getLocations())
.then(resource -> {
.flatMap(resource -> {
ResourceTransformerChain transformerChain = createTransformerChain(resolveChain);
return transformerChain.transform(exchange, resource);
});
......
......@@ -185,7 +185,7 @@ public class VersionResourceResolver extends AbstractResourceResolver {
}
return chain.resolveResource(exchange, simplePath, locations)
.then(baseResource -> {
.flatMap(baseResource -> {
String actualVersion = versionStrategy.getResourceVersion(baseResource);
if (candidateVersion.equals(actualVersion)) {
if (logger.isTraceEnabled()) {
......@@ -208,7 +208,7 @@ public class VersionResourceResolver extends AbstractResourceResolver {
List<? extends Resource> locations, ResourceResolverChain chain) {
return chain.resolveUrlPath(resourceUrlPath, locations)
.then(baseUrl -> {
.flatMap(baseUrl -> {
if (StringUtils.hasText(baseUrl)) {
VersionStrategy versionStrategy = getStrategyForPath(resourceUrlPath);
if (versionStrategy == null) {
......
......@@ -60,7 +60,7 @@ public class SimpleHandlerAdapter implements HandlerAdapter {
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
WebHandler webHandler = (WebHandler) handler;
Mono<Void> mono = webHandler.handle(exchange);
return mono.then(aVoid -> Mono.empty());
return mono.then(Mono.empty());
}
}
......@@ -113,7 +113,7 @@ public class InvocableHandlerMethod extends HandlerMethod {
public Mono<HandlerResult> invoke(ServerWebExchange exchange, BindingContext bindingContext,
Object... providedArgs) {
return resolveArguments(exchange, bindingContext, providedArgs).then(args -> {
return resolveArguments(exchange, bindingContext, providedArgs).flatMap(args -> {
try {
Object value = doInvoke(args);
HandlerResult result = new HandlerResult(this, value, getReturnType(), bindingContext);
......
......@@ -117,7 +117,7 @@ public class ModelAttributeMethodArgumentResolver extends HandlerMethodArgumentR
MonoProcessor<BindingResult> bindingResultMono = MonoProcessor.create();
model.put(BindingResult.MODEL_KEY_PREFIX + name, bindingResultMono);
return valueMono.then(value -> {
return valueMono.flatMap(value -> {
WebExchangeDataBinder binder = context.createDataBinder(exchange, value, name);
return binder.bind(exchange)
.doOnError(bindingResultMono::onError)
......@@ -188,7 +188,7 @@ public class ModelAttributeMethodArgumentResolver extends HandlerMethodArgumentR
}
// A single data class constructor -> resolve constructor arguments from request parameters.
return exchange.getRequestParams().then(requestParams -> {
return exchange.getRequestParams().flatMap(requestParams -> {
ConstructorProperties cp = ctor.getAnnotation(ConstructorProperties.class);
String[] paramNames = (cp != null ? cp.value() : parameterNameDiscoverer.getParameterNames(ctor));
Assert.state(paramNames != null, () -> "Cannot resolve parameter names for constructor " + ctor);
......
......@@ -86,7 +86,7 @@ class ModelInitializer {
.map(object -> (HandlerResult) object)
.map(handlerResult -> handleResult(handlerResult, bindingContext))
.collect(Collectors.toList());
}).then(completionList -> Mono.when(completionList));
}).flatMap(completionList -> Mono.when(completionList));
}
private Mono<Void> handleResult(HandlerResult handlerResult, BindingContext bindingContext) {
......
......@@ -110,7 +110,7 @@ public class ResponseEntityResultHandler extends AbstractMessageWriterResultHand
bodyParameter = result.getReturnTypeSource().nested();
}
return returnValueMono.then(returnValue -> {
return returnValueMono.flatMap(returnValue -> {
Assert.isInstanceOf(HttpEntity.class, returnValue, "HttpEntity expected");
HttpEntity<?> httpEntity = (HttpEntity<?>) returnValue;
......
......@@ -155,7 +155,7 @@ public abstract class AbstractView implements View, ApplicationContextAware {
exchange.getResponse().getHeaders().setContentType(contentType);
}
return getModelAttributes(model, exchange).then(mergedModel -> {
return getModelAttributes(model, exchange).flatMap(mergedModel -> {
// Expose RequestContext?
if (this.requestContextAttribute != null) {
mergedModel.put(this.requestContextAttribute, createRequestContext(exchange, mergedModel));
......
......@@ -194,7 +194,7 @@ public class ViewResolutionResultHandler extends HandlerResultHandlerSupport
return valueMono
.otherwiseIfEmpty(exchange.isNotModified() ? Mono.empty() : NO_VALUE_MONO)
.then(returnValue -> {
.flatMap(returnValue -> {
Mono<List<View>> viewsMono;
Model model = result.getModel();
......@@ -242,7 +242,7 @@ public class ViewResolutionResultHandler extends HandlerResultHandlerSupport
updateBindingContext(result.getBindingContext(), exchange);
return viewsMono.then(views -> render(views, model.asMap(), exchange));
return viewsMono.flatMap(views -> render(views, model.asMap(), exchange));
});
}
......
......@@ -81,7 +81,7 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen
.ws(url.toString(),
nettyHeaders -> setNettyHeaders(headers, nettyHeaders),
StringUtils.arrayToCommaDelimitedString(protocols))
.then(response -> {
.flatMap(response -> {
HandshakeInfo info = afterHandshake(url, toHttpHeaders(response));
ByteBufAllocator allocator = response.channel().alloc();
NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator);
......
......@@ -63,7 +63,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
Mono<String> result = this.webClient.get()
.uri("/write-and-flush")
.exchange()
.flatMap(response -> response.body(BodyExtractors.toFlux(String.class)))
.flatMapMany(response -> response.body(BodyExtractors.toFlux(String.class)))
.takeUntil(s -> s.endsWith("data1"))
.reduce((s1, s2) -> s1 + s2);
......@@ -78,7 +78,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
Mono<String> result = this.webClient.get()
.uri("/write-and-complete")
.exchange()
.flatMap(response -> response.bodyToFlux(String.class))
.flatMapMany(response -> response.bodyToFlux(String.class))
.reduce((s1, s2) -> s1 + s2);
StepVerifier.create(result)
......@@ -92,7 +92,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
Flux<String> result = this.webClient.get()
.uri("/write-and-never-complete")
.exchange()
.flatMap(response -> response.bodyToFlux(String.class));
.flatMapMany(response -> response.bodyToFlux(String.class));
StepVerifier.create(result)
.expectNextMatches(s -> s.startsWith("0123456789"))
......
......@@ -99,7 +99,7 @@ public class WebClientIntegrationTests {
.uri("/greeting?name=Spring")
.header("X-Test-Header", "testvalue")
.exchange()
.then(response -> response.bodyToMono(String.class));
.flatMap(response -> response.bodyToMono(String.class));
StepVerifier.create(result)
.expectNext("Hello Spring!")
......@@ -123,7 +123,7 @@ public class WebClientIntegrationTests {
.uri("/json")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.then(response -> response.bodyToMono(String.class));
.flatMap(response -> response.bodyToMono(String.class));
StepVerifier.create(result)
.expectNext(content)
......@@ -248,7 +248,7 @@ public class WebClientIntegrationTests {
.uri("/pojo")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.then(response -> response.bodyToMono(Pojo.class));
.flatMap(response -> response.bodyToMono(Pojo.class));
StepVerifier.create(result)
.consumeNextWith(p -> assertEquals("barbar", p.getBar()))
......@@ -270,7 +270,7 @@ public class WebClientIntegrationTests {
.uri("/pojos")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(response -> response.bodyToFlux(Pojo.class));
.flatMapMany(response -> response.bodyToFlux(Pojo.class));
StepVerifier.create(result)
.consumeNextWith(p -> assertThat(p.getBar(), Matchers.is("bar1")))
......@@ -296,7 +296,7 @@ public class WebClientIntegrationTests {
.contentType(MediaType.APPLICATION_JSON)
.body(new Pojo("foofoo", "barbar"))
.exchange()
.then(response -> response.bodyToMono(Pojo.class));
.flatMap(response -> response.bodyToMono(Pojo.class));
StepVerifier.create(result)
.consumeNextWith(p -> assertEquals("BARBAR", p.getBar()))
......@@ -321,7 +321,7 @@ public class WebClientIntegrationTests {
.uri("/test")
.cookie("testkey", "testvalue")
.exchange()
.then(response -> response.bodyToMono(String.class));
.flatMap(response -> response.bodyToMono(String.class));
StepVerifier.create(result)
.expectNext("test")
......@@ -365,7 +365,7 @@ public class WebClientIntegrationTests {
Mono<String> result = filteredClient.get()
.uri("/greeting?name=Spring")
.exchange()
.then(response -> response.bodyToMono(String.class));
.flatMap(response -> response.bodyToMono(String.class));
StepVerifier.create(result)
.expectNext("Hello Spring!")
......@@ -390,7 +390,7 @@ public class WebClientIntegrationTests {
Mono<String> result = filteredClient.get()
.uri("/greeting?name=Spring")
.exchange()
.then(response -> response.bodyToMono(String.class));
.flatMap(response -> response.bodyToMono(String.class));
StepVerifier.create(result)
.expectNext("Hello Spring!")
......
......@@ -273,7 +273,7 @@ public class DefaultServerResponseBuilderTests {
when(exchange.getResponse()).thenReturn(response);
HandlerStrategies strategies = mock(HandlerStrategies.class);
result.then(res -> res.writeTo(exchange, strategies)).block();
result.flatMap(res -> res.writeTo(exchange, strategies)).block();
assertEquals(HttpStatus.CREATED, response.getStatusCode());
assertEquals("MyValue", response.getHeaders().getFirst("MyKey"));
......@@ -290,7 +290,7 @@ public class DefaultServerResponseBuilderTests {
when(exchange.getResponse()).thenReturn(response);
HandlerStrategies strategies = mock(HandlerStrategies.class);
result.then(res -> res.writeTo(exchange, strategies)).block();
result.flatMap(res -> res.writeTo(exchange, strategies)).block();
StepVerifier.create(response.getBody()).expectComplete().verify();
}
......
......@@ -55,7 +55,7 @@ public class ResourceHandlerFunctionTests {
Mono<ServerResponse> responseMono = this.handlerFunction.handle(request);
Mono<Void> result = responseMono.then(response -> {
Mono<Void> result = responseMono.flatMap(response -> {
assertEquals(HttpStatus.OK, response.statusCode());
/*
TODO: enable when ServerEntityResponse is reintroduced
......@@ -94,7 +94,7 @@ public class ResourceHandlerFunctionTests {
Mono<ServerResponse> response = this.handlerFunction.handle(request);
Mono<Void> result = response.then(res -> {
Mono<Void> result = response.flatMap(res -> {
assertEquals(HttpStatus.OK, res.statusCode());
return res.writeTo(exchange, HandlerStrategies.withDefaults());
});
......@@ -114,7 +114,7 @@ public class ResourceHandlerFunctionTests {
ServerRequest request = new DefaultServerRequest(exchange, HandlerStrategies.withDefaults());
Mono<ServerResponse> responseMono = this.handlerFunction.handle(request);
Mono<Void> result = responseMono.then(response -> {
Mono<Void> result = responseMono.flatMap(response -> {
assertEquals(HttpStatus.OK, response.statusCode());
assertEquals(EnumSet.of(HttpMethod.GET, HttpMethod.HEAD, HttpMethod.OPTIONS),
response.headers().getAllow());
......
......@@ -91,7 +91,7 @@ public class RouterFunctionTests {
RouterFunction<EntityResponse<Mono<String>>> routerFunction = request -> Mono.just(handlerFunction);
HandlerFilterFunction<EntityResponse<Mono<String>>, EntityResponse<Mono<Integer>>> filterFunction =
(request, next) -> next.handle(request).then(
(request, next) -> next.handle(request).flatMap(
response -> {
Mono<Integer> intMono = response.entity()
.map(Integer::parseInt);
......@@ -102,7 +102,7 @@ public class RouterFunctionTests {
MockServerRequest request = MockServerRequest.builder().build();
Mono<EntityResponse<Mono<Integer>>> responseMono =
result.route(request).then(hf -> hf.handle(request));
result.route(request).flatMap(hf -> hf.handle(request));
StepVerifier.create(responseMono)
.consumeNextWith(
......
......@@ -63,7 +63,7 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn
.uri("/string")
.accept(TEXT_EVENT_STREAM)
.exchange()
.flatMap(response -> response.body(toFlux(String.class)));
.flatMapMany(response -> response.body(toFlux(String.class)));
StepVerifier.create(result)
.expectNext("foo 0")
......@@ -78,7 +78,7 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn
.uri("/person")
.accept(TEXT_EVENT_STREAM)
.exchange()
.flatMap(response -> response.body(toFlux(Person.class)));
.flatMapMany(response -> response.body(toFlux(Person.class)));
StepVerifier.create(result)
.expectNext(new Person("foo 0"))
......@@ -93,7 +93,7 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn
.uri("/event")
.accept(TEXT_EVENT_STREAM)
.exchange()
.flatMap(response -> response.body(toFlux(
.flatMapMany(response -> response.body(toFlux(
forClassWithGenerics(ServerSentEvent.class, String.class))));
StepVerifier.create(result)
......
......@@ -71,7 +71,7 @@ public class JsonStreamingIntegrationTests extends AbstractHttpHandlerIntegratio
.uri("/stream")
.accept(APPLICATION_STREAM_JSON)
.exchange()
.flatMap(response -> response.bodyToFlux(Person.class));
.flatMapMany(response -> response.bodyToFlux(Person.class));
StepVerifier.create(result)
.expectNext(new Person("foo 0"))
......
......@@ -77,7 +77,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
.uri("/string")
.accept(TEXT_EVENT_STREAM)
.exchange()
.flatMap(response -> response.bodyToFlux(String.class));
.flatMapMany(response -> response.bodyToFlux(String.class));
StepVerifier.create(result)
.expectNext("foo 0")
......@@ -92,7 +92,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
.uri("/person")
.accept(TEXT_EVENT_STREAM)
.exchange()
.flatMap(response -> response.bodyToFlux(Person.class));
.flatMapMany(response -> response.bodyToFlux(Person.class));
StepVerifier.create(result)
.expectNext(new Person("foo 0"))
......@@ -108,7 +108,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
.uri("/event")
.accept(TEXT_EVENT_STREAM)
.exchange()
.flatMap(response -> response.body(toFlux(type)));
.flatMapMany(response -> response.body(toFlux(type)));
StepVerifier.create(result)
.consumeNextWith( event -> {
......@@ -135,7 +135,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
.uri("/event")
.accept(TEXT_EVENT_STREAM)
.exchange()
.flatMap(response -> response.body(toFlux(
.flatMapMany(response -> response.body(toFlux(
forClassWithGenerics(ServerSentEvent.class, String.class))));
StepVerifier.create(result)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册