diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java index 5f8720cb183027a903c87b06468e74a7a4c78271..9601edf1ce04e948ab037b5f500b41c33f4fdc81 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java @@ -16,6 +16,8 @@ package org.springframework.web.reactive.socket; import java.net.URI; +import java.security.Principal; +import java.util.Optional; import java.util.function.Function; import org.reactivestreams.Publisher; @@ -24,6 +26,8 @@ import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.web.reactive.socket.adapter.HandshakeInfo; /** * Representation for a WebSocket session. @@ -39,13 +43,13 @@ public interface WebSocketSession { String getId(); /** - * Return the WebSocket endpoint URI. + * Return information from the handshake request. */ - URI getUri(); + HandshakeInfo getHandshakeInfo(); /** - * Return a {@link DataBufferFactory} that can be used for creating message payloads. - * @return a buffer factory + * Return a {@code DataBuffer} Factory to create message payloads. + * @return the buffer factory for the session */ DataBufferFactory bufferFactory(); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index 30e2576b458e3dd74bd7a8b233d0b9b24cbeb294..c0c11b02300e2ce156366fe964eb430198507d4b 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -17,7 +17,6 @@ package org.springframework.web.reactive.socket.adapter; import java.io.IOException; -import java.net.URI; import java.util.concurrent.atomic.AtomicBoolean; import org.reactivestreams.Publisher; @@ -55,8 +54,10 @@ public abstract class AbstractListenerWebSocketSession extends WebSocketSessi private final AtomicBoolean sendCalled = new AtomicBoolean(); - public AbstractListenerWebSocketSession(T delegate, String id, URI uri, DataBufferFactory bufferFactory) { - super(delegate, id, uri, bufferFactory); + public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo handshakeInfo, + DataBufferFactory bufferFactory) { + + super(delegate, id, handshakeInfo, bufferFactory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/HandshakeInfo.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/HandshakeInfo.java new file mode 100644 index 0000000000000000000000000000000000000000..b66a0c809e1df08b94fa3ff3682364b8dedf05f2 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/HandshakeInfo.java @@ -0,0 +1,68 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.adapter; + +import java.net.URI; +import java.security.Principal; + +import reactor.core.publisher.Mono; + +import org.springframework.http.HttpHeaders; +import org.springframework.util.Assert; + +/** + * Simple container of information from a WebSocket handshake request. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public class HandshakeInfo { + + private final URI uri; + + private final HttpHeaders headers; + + private final Mono principal; + + + public HandshakeInfo(URI uri, HttpHeaders headers, Mono principal) { + Assert.notNull(uri, "URI is required."); + Assert.notNull(headers, "HttpHeaders are required."); + Assert.notNull(principal, "Prinicpal is required."); + this.uri = uri; + this.headers = headers; + this.principal = principal; + } + + + public URI getUri() { + return this.uri; + } + + public HttpHeaders getHeaders() { + return this.headers; + } + + public Mono getPrincipal() { + return this.principal; + } + + @Override + public String toString() { + return "HandshakeInfo[uri=" + this.uri + ", headers=" + this.headers + "]"; + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java index 540ec8eac7ba5b26d4bed3cdc33a67aa286be587..ffb3e38a736de1890896cdfe5f754776594b7481 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java @@ -32,8 +32,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketMessage; @@ -54,17 +53,16 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport private JettyWebSocketSession session; - public JettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response, + public JettyWebSocketHandlerAdapter(HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory, WebSocketHandler delegate) { - super(request, response, delegate); + super(handshakeInfo, bufferFactory, delegate); } @OnWebSocketConnect public void onWebSocketConnect(Session session) { - this.session = new JettyWebSocketSession(session, getUri(), getBufferFactory()); - + this.session = new JettyWebSocketSession(session, getHandshakeInfo(), getBufferFactory()); HandlerResultSubscriber subscriber = new HandlerResultSubscriber(); getDelegate().handle(this.session).subscribe(subscriber); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index 99715f5f9da9b5a1813d28b94c6266719d9a9b6e..7aa00a6a2ded99a0b89e224ad98e58fdf3d30212 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -17,7 +17,6 @@ package org.springframework.web.reactive.socket.adapter; import java.io.IOException; -import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -41,8 +40,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; public class JettyWebSocketSession extends AbstractListenerWebSocketSession { - public JettyWebSocketSession(Session session, URI uri, DataBufferFactory bufferFactory) { - super(session, ObjectUtils.getIdentityHexString(session), uri, bufferFactory); + public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory bufferFactory) { + super(session, ObjectUtils.getIdentityHexString(session), info, bufferFactory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java index b4a758233894b55b3596bfb52caa8f1f4794a466..00f863d4bfe659e32f845c83181380d939c7b844 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java @@ -15,7 +15,6 @@ */ package org.springframework.web.reactive.socket.adapter; -import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -54,8 +53,8 @@ public abstract class NettyWebSocketSessionSupport extends WebSocketSessionSu } - protected NettyWebSocketSessionSupport(T delegate, URI uri, NettyDataBufferFactory bufferFactory) { - super(delegate, ObjectUtils.getIdentityHexString(delegate), uri, bufferFactory); + protected NettyWebSocketSessionSupport(T delegate, HandshakeInfo info, NettyDataBufferFactory factory) { + super(delegate, ObjectUtils.getIdentityHexString(delegate), info, factory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketHandlerAdapter.java deleted file mode 100644 index 7c5f90e299e5290fbe818ccfc1bc5b46d29bdf8b..0000000000000000000000000000000000000000 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketHandlerAdapter.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2002-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.web.reactive.socket.adapter; - -import java.util.function.BiFunction; - -import org.reactivestreams.Publisher; -import reactor.ipc.netty.http.websocket.WebsocketInbound; -import reactor.ipc.netty.http.websocket.WebsocketOutbound; - -import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.http.server.reactive.ServerHttpResponse; -import org.springframework.web.reactive.socket.WebSocketHandler; - -/** - * Reactor Netty {@code WebSocketHandler} implementation adapting and - * delegating to a Spring {@link WebSocketHandler}. - * - * @author Rossen Stoyanchev - * @since 5.0 - */ -public class ReactorNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport - implements BiFunction> { - - - public ReactorNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response, - WebSocketHandler handler) { - - super(request, response, handler); - } - - - @Override - public Publisher apply(WebsocketInbound inbound, WebsocketOutbound outbound) { - ReactorNettyWebSocketSession session = - new ReactorNettyWebSocketSession(inbound, outbound, getUri(), getBufferFactory()); - return getDelegate().handle(session); - } - -} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java index 08b97a0369f2ab965c1c9f37277ec0507e4f9054..e5f0ded566a4e07d6a8600df51ca6b61bf543b18 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java @@ -15,8 +15,6 @@ */ package org.springframework.web.reactive.socket.adapter; -import java.net.URI; - import io.netty.handler.codec.http.websocketx.WebSocketFrame; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -42,10 +40,10 @@ public class ReactorNettyWebSocketSession extends NettyWebSocketSessionSupport { - protected ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound, - URI uri, NettyDataBufferFactory factory) { + public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound, + HandshakeInfo handshakeInfo, NettyDataBufferFactory bufferFactory) { - super(new WebSocketConnection(inbound, outbound), uri, factory); + super(new WebSocketConnection(inbound, outbound), handshakeInfo, bufferFactory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java deleted file mode 100644 index d9896ad43ca62268336aa6eae86abfcdea9474d2..0000000000000000000000000000000000000000 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2002-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.web.reactive.socket.adapter; - -import io.reactivex.netty.protocol.http.ws.WebSocketConnection; -import reactor.core.publisher.Mono; -import rx.Observable; -import rx.RxReactiveStreams; - -import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.http.server.reactive.ServerHttpResponse; -import org.springframework.web.reactive.socket.WebSocketHandler; - -/** - * RxNetty {@code WebSocketHandler} implementation adapting and delegating to a - * Spring {@link WebSocketHandler}. - * - * @author Rossen Stoyanchev - * @since 5.0 - */ -public class RxNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport - implements io.reactivex.netty.protocol.http.ws.server.WebSocketHandler { - - - public RxNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response, - WebSocketHandler handler) { - - super(request, response, handler); - } - - - @Override - public Observable handle(WebSocketConnection conn) { - RxNettyWebSocketSession session = new RxNettyWebSocketSession(conn, getUri(), getBufferFactory()); - Mono result = getDelegate().handle(session); - return RxReactiveStreams.toObservable(result); - } - -} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java index 4b708675165e610fbf9cdeb514cdacf9dd972be1..8c7f1f1f5b6f7ee7292809205b1a956da8ecc77b 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java @@ -16,8 +16,6 @@ package org.springframework.web.reactive.socket.adapter; -import java.net.URI; - import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.reactivex.netty.protocol.http.ws.WebSocketConnection; import org.reactivestreams.Publisher; @@ -41,8 +39,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; public class RxNettyWebSocketSession extends NettyWebSocketSessionSupport { - public RxNettyWebSocketSession(WebSocketConnection conn, URI uri, NettyDataBufferFactory factory) { - super(conn, uri, factory); + public RxNettyWebSocketSession(WebSocketConnection conn, HandshakeInfo info, NettyDataBufferFactory factory) { + super(conn, info, factory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java index cd1af4728ad715e7ae7c30d74e312781dc6f3dbf..a32bd8bdf7228d7d5aab905f3a06263e1c5c7a56 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java @@ -28,8 +28,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketMessage; @@ -47,10 +46,10 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor private TomcatWebSocketSession session; - public TomcatWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response, + public TomcatWebSocketHandlerAdapter(HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory, WebSocketHandler delegate) { - super(request, response, delegate); + super(handshakeInfo, bufferFactory, delegate); } @@ -67,8 +66,9 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor @Override public void onOpen(Session session, EndpointConfig config) { - TomcatWebSocketHandlerAdapter.this.session = - new TomcatWebSocketSession(session, getUri(), getBufferFactory()); + + TomcatWebSocketHandlerAdapter.this.session = new TomcatWebSocketSession( + session, getHandshakeInfo(), getBufferFactory()); session.addMessageHandler(String.class, message -> { WebSocketMessage webSocketMessage = toMessage(message); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java index 72fd76d8b409b5d2d8c96130e075621cabde4c4a..b7537ff446646d2050c5edfccb8864323fda9888 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java @@ -17,7 +17,6 @@ package org.springframework.web.reactive.socket.adapter; import java.io.IOException; -import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import javax.websocket.CloseReason; @@ -43,8 +42,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; public class TomcatWebSocketSession extends AbstractListenerWebSocketSession { - public TomcatWebSocketSession(Session session, URI uri, DataBufferFactory bufferFactory) { - super(session, session.getId(), uri, bufferFactory); + public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory bufferFactory) { + super(session, session.getId(), info, bufferFactory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java index 691c4395b0b14b9719f46984f9788ee00294fb7f..80643a7c8a9b1a3c6ffb299af4e50503f383ec0e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java @@ -30,8 +30,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketMessage; @@ -50,16 +49,16 @@ public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupp private UndertowWebSocketSession session; - public UndertowWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response, + public UndertowWebSocketHandlerAdapter(HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory, WebSocketHandler delegate) { - super(request, response, delegate); + super(handshakeInfo, bufferFactory, delegate); } @Override public void onConnect(WebSocketHttpExchange exchange, WebSocketChannel channel) { - this.session = new UndertowWebSocketSession(channel, getUri(), getBufferFactory()); + this.session = new UndertowWebSocketSession(channel, getHandshakeInfo(), getBufferFactory()); channel.getReceiveSetter().set(new UndertowReceiveListener()); channel.resumeReceives(); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java index 46d3432033af59eab02a76452c78efe2614074ae..163272dfa9fe0ae18eb0045218c4eeb5e025749e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java @@ -17,7 +17,6 @@ package org.springframework.web.reactive.socket.adapter; import java.io.IOException; -import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -43,8 +42,10 @@ import org.springframework.web.reactive.socket.WebSocketSession; public class UndertowWebSocketSession extends AbstractListenerWebSocketSession { - public UndertowWebSocketSession(WebSocketChannel channel, URI url, DataBufferFactory bufferFactory) { - super(channel, ObjectUtils.getIdentityHexString(channel), url, bufferFactory); + public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo handshakeInfo, + DataBufferFactory bufferFactory) { + + super(channel, ObjectUtils.getIdentityHexString(channel), handshakeInfo, bufferFactory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketHandlerAdapterSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketHandlerAdapterSupport.java index 4a36ee2e0f98163431dacbf2dbe3816806ede1b7..bbd38ea9677fcbabf90039def1cf47112d9abdd9 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketHandlerAdapterSupport.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketHandlerAdapterSupport.java @@ -15,44 +15,41 @@ */ package org.springframework.web.reactive.socket.adapter; -import java.net.URI; - import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.Assert; import org.springframework.web.reactive.socket.WebSocketHandler; /** - * Base class for {@link WebSocketHandler} adapters to underlying WebSocket - * handler APIs. + * Base class for {@link WebSocketHandler} adapters to WebSocket handler APIs + * of underlying runtimes. * * @author Rossen Stoyanchev * @since 5.0 */ public abstract class WebSocketHandlerAdapterSupport { - private final URI uri; + private final HandshakeInfo handshakeInfo; private final WebSocketHandler delegate; private final DataBufferFactory bufferFactory; - protected WebSocketHandlerAdapterSupport(ServerHttpRequest request, ServerHttpResponse response, + protected WebSocketHandlerAdapterSupport(HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory, WebSocketHandler handler) { - Assert.notNull("ServerHttpRequest is required"); - Assert.notNull("ServerHttpResponse is required"); - Assert.notNull("WebSocketHandler handler is required"); - this.uri = request.getURI(); - this.bufferFactory = response.bufferFactory(); + Assert.notNull(handshakeInfo, "HandshakeInfo is required."); + Assert.notNull(bufferFactory, "DataBufferFactory is required"); + Assert.notNull(handler, "WebSocketHandler handler is required"); + + this.handshakeInfo = handshakeInfo; + this.bufferFactory = bufferFactory; this.delegate = handler; } - protected URI getUri() { - return this.uri; + protected HandshakeInfo getHandshakeInfo() { + return this.handshakeInfo; } protected WebSocketHandler getDelegate() { diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java index b50dc7257c72cfb53b39620f74c5cecf0a995cb7..a277d372c2dbb25e9532859ea515eed787afd4db 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java @@ -16,12 +16,13 @@ package org.springframework.web.reactive.socket.adapter; -import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.function.Function; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; @@ -48,23 +49,25 @@ public abstract class WebSocketSessionSupport implements WebSocketSession { private final String id; - private final URI uri; + private final HandshakeInfo handshakeInfo; private final DataBufferFactory bufferFactory; /** * Create a new instance and associate the given attributes with it. - * @param delegate the underlying WebSocket connection */ - protected WebSocketSessionSupport(T delegate, String id, URI uri, DataBufferFactory bufferFactory) { + protected WebSocketSessionSupport(T delegate, String id, HandshakeInfo handshakeInfo, + DataBufferFactory bufferFactory) { + Assert.notNull(delegate, "Native session is required."); - Assert.notNull(id, "'id' is required."); - Assert.notNull(uri, "URI is required."); - Assert.notNull(bufferFactory, "DataBufferFactory is required."); + Assert.notNull(id, "Session id is required."); + Assert.notNull(handshakeInfo, "HandshakeInfo is required."); + Assert.notNull(bufferFactory, "DataBuffer factory is required."); + this.delegate = delegate; this.id = id; - this.uri = uri; + this.handshakeInfo = handshakeInfo; this.bufferFactory = bufferFactory; } @@ -82,8 +85,18 @@ public abstract class WebSocketSessionSupport implements WebSocketSession { } @Override - public URI getUri() { - return this.uri; + public HandshakeInfo getHandshakeInfo() { + return this.handshakeInfo; + } + + @Override + public Flux receive() { + return null; + } + + @Override + public Mono send(Publisher messages) { + return null; } @Override @@ -129,7 +142,7 @@ public abstract class WebSocketSessionSupport implements WebSocketSession { @Override public String toString() { - return getClass().getSimpleName() + "[id=" + getId() + ", uri=" + getUri() + "]"; + return getClass().getSimpleName() + "[id=" + getId() + ", uri=" + getHandshakeInfo().getUri() + "]"; } } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java index 3332d46d144d582ca51a90303cd9754381079f01..a824962d7e1058b79d20530859578394c7d1a920 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java @@ -17,6 +17,8 @@ package org.springframework.web.reactive.socket.server.upgrade; import java.io.IOException; +import java.net.URI; +import java.security.Principal; import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -27,12 +29,15 @@ import reactor.core.publisher.Mono; import org.springframework.context.Lifecycle; import org.springframework.core.NamedThreadLocal; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.http.HttpHeaders; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.server.reactive.ServletServerHttpRequest; import org.springframework.http.server.reactive.ServletServerHttpResponse; import org.springframework.util.Assert; import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.adapter.HandshakeInfo; import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter; import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.server.ServerWebExchange; @@ -99,13 +104,21 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life @Override public Mono upgrade(ServerWebExchange exchange, WebSocketHandler handler) { + ServerHttpRequest request = exchange.getRequest(); ServerHttpResponse response = exchange.getResponse(); - JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(request, response, handler); HttpServletRequest servletRequest = getHttpServletRequest(request); HttpServletResponse servletResponse = getHttpServletResponse(response); + URI uri = request.getURI(); + HttpHeaders headers = request.getHeaders(); + Mono principal = exchange.getPrincipal(); + HandshakeInfo info = new HandshakeInfo(uri, headers, principal); + DataBufferFactory bufferFactory = response.bufferFactory(); + + JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(info, bufferFactory, handler); + startLazily(servletRequest); boolean isUpgrade = this.factory.isUpgradeRequest(servletRequest, servletResponse); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java index 9f8531862df9a6a7a25a26f564d8ccf022afb5b8..eed578139bf4541885bcf7c71e02ce5a7f54eb08 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java @@ -15,15 +15,20 @@ */ package org.springframework.web.reactive.socket.server.upgrade; +import java.net.URI; +import java.security.Principal; import java.util.List; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.http.HttpHeaders; import org.springframework.http.server.reactive.ReactorServerHttpRequest; import org.springframework.http.server.reactive.ReactorServerHttpResponse; import org.springframework.util.StringUtils; import org.springframework.web.reactive.socket.WebSocketHandler; -import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.adapter.HandshakeInfo; +import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession; import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.server.ServerWebExchange; @@ -36,18 +41,23 @@ import org.springframework.web.server.ServerWebExchange; public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy { @Override - public Mono upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler) { + public Mono upgrade(ServerWebExchange exchange, WebSocketHandler handler) { ReactorServerHttpRequest request = (ReactorServerHttpRequest) exchange.getRequest(); ReactorServerHttpResponse response = (ReactorServerHttpResponse) exchange.getResponse(); - ReactorNettyWebSocketHandlerAdapter reactorHandler = - new ReactorNettyWebSocketHandlerAdapter(request, response, webSocketHandler); + URI uri = request.getURI(); + HttpHeaders headers = request.getHeaders(); + Mono principal = exchange.getPrincipal(); + HandshakeInfo handshakeInfo = new HandshakeInfo(uri, headers, principal); + NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory(); - String protocols = StringUtils.arrayToCommaDelimitedString(getSubProtocols(webSocketHandler)); + String protocols = StringUtils.arrayToCommaDelimitedString(getSubProtocols(handler)); protocols = (StringUtils.hasText(protocols) ? protocols : null); - return response.getReactorResponse().sendWebsocket(protocols, reactorHandler); + return response.getReactorResponse().sendWebsocket(protocols, + (inbound, outbound) -> handler.handle( + new ReactorNettyWebSocketSession(inbound, outbound, handshakeInfo, bufferFactory))); } private static String[] getSubProtocols(WebSocketHandler webSocketHandler) { diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/RxNettyRequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/RxNettyRequestUpgradeStrategy.java index b923f381665be4d1b2222affe9e58fb25f860e6f..46c09d6cf327b8c89721e2cf366bcc2acea18ce5 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/RxNettyRequestUpgradeStrategy.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/RxNettyRequestUpgradeStrategy.java @@ -15,17 +15,22 @@ */ package org.springframework.web.reactive.socket.server.upgrade; +import java.net.URI; +import java.security.Principal; import java.util.List; -import java.util.Map; import reactor.core.publisher.Mono; import rx.Observable; import rx.RxReactiveStreams; +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.http.HttpHeaders; import org.springframework.http.server.reactive.RxNettyServerHttpRequest; import org.springframework.http.server.reactive.RxNettyServerHttpResponse; import org.springframework.web.reactive.socket.WebSocketHandler; -import org.springframework.web.reactive.socket.adapter.RxNettyWebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.WebSocketSession; +import org.springframework.web.reactive.socket.adapter.HandshakeInfo; +import org.springframework.web.reactive.socket.adapter.RxNettyWebSocketSession; import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.server.ServerWebExchange; @@ -37,18 +42,25 @@ import org.springframework.web.server.ServerWebExchange; */ public class RxNettyRequestUpgradeStrategy implements RequestUpgradeStrategy { + @Override - public Mono upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler) { + public Mono upgrade(ServerWebExchange exchange, WebSocketHandler handler) { RxNettyServerHttpRequest request = (RxNettyServerHttpRequest) exchange.getRequest(); RxNettyServerHttpResponse response = (RxNettyServerHttpResponse) exchange.getResponse(); - RxNettyWebSocketHandlerAdapter rxNettyHandler = - new RxNettyWebSocketHandlerAdapter(request, response, webSocketHandler); + URI uri = request.getURI(); + HttpHeaders headers = request.getHeaders(); + Mono principal = exchange.getPrincipal(); + HandshakeInfo handshakeInfo = new HandshakeInfo(uri, headers, principal); + NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory(); Observable completion = response.getRxNettyResponse() - .acceptWebSocketUpgrade(rxNettyHandler) - .subprotocol(getSubProtocols(webSocketHandler)); + .acceptWebSocketUpgrade(conn -> { + WebSocketSession session = new RxNettyWebSocketSession(conn, handshakeInfo, bufferFactory); + return RxReactiveStreams.toObservable(handler.handle(session)); + }) + .subprotocol(getSubProtocols(handler)); return Mono.from(RxReactiveStreams.toPublisher(completion)); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java index dca97965519a50532b5d858521eb1503decb6a21..4cab4575d5247d0655e297af70fba015ab1caae0 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java @@ -17,9 +17,9 @@ package org.springframework.web.reactive.socket.server.upgrade; import java.io.IOException; +import java.net.URI; +import java.security.Principal; import java.util.Collections; -import java.util.Map; - import javax.servlet.ServletContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; @@ -28,18 +28,21 @@ import javax.websocket.Endpoint; import javax.websocket.server.ServerEndpointConfig; import org.apache.tomcat.websocket.server.WsServerContainer; +import reactor.core.publisher.Mono; + +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.http.HttpHeaders; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.server.reactive.ServletServerHttpRequest; import org.springframework.http.server.reactive.ServletServerHttpResponse; import org.springframework.util.Assert; import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.adapter.HandshakeInfo; import org.springframework.web.reactive.socket.adapter.TomcatWebSocketHandlerAdapter; import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.server.ServerWebExchange; -import reactor.core.publisher.Mono; - /** * A {@link RequestUpgradeStrategy} for use with Tomcat. * @@ -56,11 +59,17 @@ public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy { ServerHttpRequest request = exchange.getRequest(); ServerHttpResponse response = exchange.getResponse(); - Endpoint endpoint = new TomcatWebSocketHandlerAdapter(request, response, handler).getEndpoint(); HttpServletRequest servletRequest = getHttpServletRequest(request); HttpServletResponse servletResponse = getHttpServletResponse(response); + URI uri = request.getURI(); + HttpHeaders headers = request.getHeaders(); + Mono principal = exchange.getPrincipal(); + HandshakeInfo info = new HandshakeInfo(uri, headers, principal); + DataBufferFactory bufferFactory = response.bufferFactory(); + Endpoint endpoint = new TomcatWebSocketHandlerAdapter(info, bufferFactory, handler).getEndpoint(); + String requestURI = servletRequest.getRequestURI(); ServerEndpointConfig config = new ServerEndpointRegistration(requestURI, endpoint); try { diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java index ec13e2b6854dfcb569dca46b118f1f953f26ba77..6fae7acb14cfd14a41caba6d50c7440742f9cbfb 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java @@ -16,11 +16,17 @@ package org.springframework.web.reactive.socket.server.upgrade; +import java.net.URI; +import java.security.Principal; + +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.http.HttpHeaders; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.server.reactive.UndertowServerHttpRequest; import org.springframework.util.Assert; import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.adapter.HandshakeInfo; import org.springframework.web.reactive.socket.adapter.UndertowWebSocketHandlerAdapter; import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.server.ServerWebExchange; @@ -38,12 +44,21 @@ import reactor.core.publisher.Mono; */ public class UndertowRequestUpgradeStrategy implements RequestUpgradeStrategy { + @Override public Mono upgrade(ServerWebExchange exchange, WebSocketHandler handler) { ServerHttpRequest request = exchange.getRequest(); ServerHttpResponse response = exchange.getResponse(); - WebSocketConnectionCallback callback = new UndertowWebSocketHandlerAdapter(request, response, handler); + + URI uri = request.getURI(); + HttpHeaders headers = request.getHeaders(); + Mono principal = exchange.getPrincipal(); + HandshakeInfo info = new HandshakeInfo(uri, headers, principal); + DataBufferFactory bufferFactory = response.bufferFactory(); + + WebSocketConnectionCallback callback = + new UndertowWebSocketHandlerAdapter(info, bufferFactory, handler); Assert.isTrue(request instanceof UndertowServerHttpRequest); HttpServerExchange httpExchange = ((UndertowServerHttpRequest) request).getUndertowExchange();