diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractWebSocketIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractWebSocketIntegrationTests.java index c81e3a5f9c42c6e5120aa1a085b096f8a83cf1a3..43a73fd13f32c6f22cc1423cfa96a51a3374c6dd 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractWebSocketIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractWebSocketIntegrationTests.java @@ -34,6 +34,7 @@ import org.junit.jupiter.params.provider.MethodSource; import org.xnio.OptionMap; import org.xnio.Xnio; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.util.function.Tuple3; import org.springframework.context.ApplicationContext; @@ -98,11 +99,19 @@ public abstract class AbstractWebSocketIntegrationTests { servers.put(new ReactorHttpServer(), ReactorNettyConfig.class); servers.put(new UndertowHttpServer(), UndertowConfig.class); - Flux f1 = Flux.fromArray(clients).concatMap(c -> Flux.just(c).repeat(servers.size() - 1)); - Flux f2 = Flux.fromIterable(servers.keySet()).repeat(clients.length); - Flux> f3 = Flux.fromIterable(servers.values()).repeat(clients.length); + // Try each client once against each server.. - return Flux.zip(f1, f2, f3).map(Tuple3::toArray).collectList().block() + Flux f1 = Flux.fromArray(clients) + .concatMap(c -> Mono.just(c).repeat(servers.size() - 1)); + + Flux>> f2 = Flux.fromIterable(servers.entrySet()) + .repeat(clients.length - 1) + .share(); + + return Flux.zip(f1, f2.map(Map.Entry::getKey), f2.map(Map.Entry::getValue)) + .map(Tuple3::toArray) + .collectList() + .block() .toArray(new Object[clients.length * servers.size()][2]); }