From 1243556047a737536d69098ea1130180bced231c Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Sun, 18 Dec 2016 10:52:17 -0500 Subject: [PATCH] Add Reactor Netty WebSocketClient support Issue: SPR-14527 --- .../adapter/ReactorNettyWebSocketSession.java | 23 ++--- .../client/ReactorNettyWebSocketClient.java | 90 +++++++++++++++++++ .../server/WebSocketIntegrationTests.java | 24 +++-- 3 files changed, 120 insertions(+), 17 deletions(-) create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java index 660497c148..986e739ae8 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java @@ -19,9 +19,9 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.ipc.netty.NettyInbound; +import reactor.ipc.netty.NettyOutbound; import reactor.ipc.netty.NettyPipeline; -import reactor.ipc.netty.http.websocket.WebsocketInbound; -import reactor.ipc.netty.http.websocket.WebsocketOutbound; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.web.reactive.socket.CloseStatus; @@ -41,7 +41,7 @@ public class ReactorNettyWebSocketSession extends NettyWebSocketSessionSupport { - public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound, + public ReactorNettyWebSocketSession(NettyInbound inbound, NettyOutbound outbound, HandshakeInfo handshakeInfo, NettyDataBufferFactory bufferFactory) { super(new WebSocketConnection(inbound, outbound), handshakeInfo, bufferFactory); @@ -50,14 +50,14 @@ public class ReactorNettyWebSocketSession @Override public Flux receive() { - WebsocketInbound inbound = getDelegate().getWebsocketInbound(); + NettyInbound inbound = getDelegate().getInbound(); return toMessageFlux(inbound.receiveObject().cast(WebSocketFrame.class)); } @Override public Mono send(Publisher messages) { Flux frameFlux = Flux.from(messages).map(this::toFrame); - WebsocketOutbound outbound = getDelegate().getWebsocketOutbound(); + NettyOutbound outbound = getDelegate().getOutbound(); return outbound.options(NettyPipeline.SendOptions::flushOnEach) .sendObject(frameFlux) .then(); @@ -72,24 +72,25 @@ public class ReactorNettyWebSocketSession /** - * Simple container for {@link WebsocketInbound} and {@link WebsocketOutbound}. + * Simple container for {@link NettyInbound} and {@link NettyOutbound}. */ public static class WebSocketConnection { - private final WebsocketInbound inbound; + private final NettyInbound inbound; - private final WebsocketOutbound outbound; + private final NettyOutbound outbound; - public WebSocketConnection(WebsocketInbound inbound, WebsocketOutbound outbound) { + + public WebSocketConnection(NettyInbound inbound, NettyOutbound outbound) { this.inbound = inbound; this.outbound = outbound; } - public WebsocketInbound getWebsocketInbound() { + public NettyInbound getInbound() { return this.inbound; } - public WebsocketOutbound getWebsocketOutbound() { + public NettyOutbound getOutbound() { return this.outbound; } } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java new file mode 100644 index 0000000000..bbb9cdf054 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java @@ -0,0 +1,90 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.client; + +import java.net.URI; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import io.netty.buffer.ByteBufAllocator; +import reactor.core.publisher.Mono; +import reactor.ipc.netty.NettyOutbound; +import reactor.ipc.netty.http.client.HttpClient; +import reactor.ipc.netty.http.client.HttpClientOptions; +import reactor.ipc.netty.http.client.HttpClientRequest; + +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.web.reactive.socket.HandshakeInfo; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.WebSocketSession; +import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession; + +/** + * A {@link WebSocketClient} based on Reactor Netty. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public class ReactorNettyWebSocketClient implements WebSocketClient { + + private final HttpClient httpClient; + + + public ReactorNettyWebSocketClient() { + this.httpClient = HttpClient.create(); + } + + public ReactorNettyWebSocketClient(Consumer clientOptions) { + this.httpClient = HttpClient.create(clientOptions); + } + + + @Override + public Mono execute(URI url, WebSocketHandler handler) { + return execute(url, new HttpHeaders(), handler); + } + + @Override + public Mono execute(URI url, HttpHeaders headers, WebSocketHandler handler) { + + // We have to store the NettyOutbound fow now.. + // The alternative HttpClientResponse#receiveWebSocket does not work at present + AtomicReference outboundRef = new AtomicReference<>(); + + return this.httpClient + .get(url.toString(), request -> { + addHeaders(request, headers); + NettyOutbound outbound = request.sendWebsocket(); + outboundRef.set(outbound); + return outbound; + }) + .then(inbound -> { + ByteBufAllocator allocator = inbound.channel().alloc(); + NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator); + NettyOutbound outbound = outboundRef.get(); + HandshakeInfo info = new HandshakeInfo(url, headers, Mono.empty()); + WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory); + return handler.handle(session); + }); + } + + private void addHeaders(HttpClientRequest request, HttpHeaders headers) { + headers.entrySet().stream() + .forEach(e -> request.requestHeaders().set(e.getKey(), e.getValue())); + } + +} diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/WebSocketIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/WebSocketIntegrationTests.java index 71b0e1471d..e860f10d31 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/WebSocketIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/WebSocketIntegrationTests.java @@ -15,6 +15,7 @@ */ package org.springframework.web.reactive.socket.server; +import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; @@ -29,7 +30,9 @@ import org.springframework.web.reactive.HandlerMapping; import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketSession; +import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; import org.springframework.web.reactive.socket.client.RxNettyWebSocketClient; +import org.springframework.web.reactive.socket.client.WebSocketClient; import static org.junit.Assert.assertEquals; @@ -48,13 +51,22 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests @Test - public void echo() throws Exception { + public void echoReactorNettyClient() throws Exception { + testEcho(new ReactorNettyWebSocketClient()); + } + + @Test + public void echoRxNettyClient() throws Exception { + testEcho(new RxNettyWebSocketClient()); + } + + private void testEcho(WebSocketClient client) throws URISyntaxException { int count = 100; Flux input = Flux.range(1, count).map(index -> "msg-" + index); - ReplayProcessor emitter = ReplayProcessor.create(count); + ReplayProcessor output = ReplayProcessor.create(count); - new RxNettyWebSocketClient() - .execute(getUrl("/echo"), session -> session + client.execute(getUrl("/echo"), + session -> session .send(input.map(session::textMessage)) .thenMany(session.receive() .take(count) @@ -63,11 +75,11 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests message.release(); return text; })) - .subscribeWith(emitter) + .subscribeWith(output) .then()) .blockMillis(5000); - assertEquals(input.collectList().blockMillis(5000), emitter.collectList().blockMillis(5000)); + assertEquals(input.collectList().blockMillis(5000), output.collectList().blockMillis(5000)); } -- GitLab