提交 bcf6f6e7 编写于 作者: R Rossen Stoyanchev

Refactoring in reactive WebSocketSession and adapters

The WebSocketHander adapters are now neutral for client vs server-side
use with the adapters for RxNetty and Reactor Netty (server-side only)
completely removed.

A new HandshakeInfo carries information about the handshake including
URI, headers, and principal from the upgrade strategy, to the adapter,
and then to the session.

WebSocketSession exposes the HandshakeInfo as well reducing its overall
number of methods.
上级 edcf0491
......@@ -16,6 +16,8 @@
package org.springframework.web.reactive.socket;
import java.net.URI;
import java.security.Principal;
import java.util.Optional;
import java.util.function.Function;
import org.reactivestreams.Publisher;
......@@ -24,6 +26,8 @@ import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.socket.adapter.HandshakeInfo;
/**
* Representation for a WebSocket session.
......@@ -39,13 +43,13 @@ public interface WebSocketSession {
String getId();
/**
* Return the WebSocket endpoint URI.
* Return information from the handshake request.
*/
URI getUri();
HandshakeInfo getHandshakeInfo();
/**
* Return a {@link DataBufferFactory} that can be used for creating message payloads.
* @return a buffer factory
* Return a {@code DataBuffer} Factory to create message payloads.
* @return the buffer factory for the session
*/
DataBufferFactory bufferFactory();
......
......@@ -17,7 +17,6 @@
package org.springframework.web.reactive.socket.adapter;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
......@@ -55,8 +54,10 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi
private final AtomicBoolean sendCalled = new AtomicBoolean();
public AbstractListenerWebSocketSession(T delegate, String id, URI uri, DataBufferFactory bufferFactory) {
super(delegate, id, uri, bufferFactory);
public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo handshakeInfo,
DataBufferFactory bufferFactory) {
super(delegate, id, handshakeInfo, bufferFactory);
}
......
......@@ -15,38 +15,54 @@
*/
package org.springframework.web.reactive.socket.adapter;
import io.reactivex.netty.protocol.http.ws.WebSocketConnection;
import java.net.URI;
import java.security.Principal;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
/**
* RxNetty {@code WebSocketHandler} implementation adapting and delegating to a
* Spring {@link WebSocketHandler}.
* Simple container of information from a WebSocket handshake request.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class RxNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
implements io.reactivex.netty.protocol.http.ws.server.WebSocketHandler {
public class HandshakeInfo {
private final URI uri;
private final HttpHeaders headers;
private final Mono<Principal> principal;
public RxNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler handler) {
super(request, response, handler);
public HandshakeInfo(URI uri, HttpHeaders headers, Mono<Principal> principal) {
Assert.notNull(uri, "URI is required.");
Assert.notNull(headers, "HttpHeaders are required.");
Assert.notNull(principal, "Prinicpal is required.");
this.uri = uri;
this.headers = headers;
this.principal = principal;
}
public URI getUri() {
return this.uri;
}
public HttpHeaders getHeaders() {
return this.headers;
}
public Mono<Principal> getPrincipal() {
return this.principal;
}
@Override
public Observable<Void> handle(WebSocketConnection conn) {
RxNettyWebSocketSession session = new RxNettyWebSocketSession(conn, getUri(), getBufferFactory());
Mono<Void> result = getDelegate().handle(session);
return RxReactiveStreams.toObservable(result);
public String toString() {
return "HandshakeInfo[uri=" + this.uri + ", headers=" + this.headers + "]";
}
}
......@@ -32,8 +32,7 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
......@@ -54,17 +53,16 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
private JettyWebSocketSession session;
public JettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response,
public JettyWebSocketHandlerAdapter(HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory,
WebSocketHandler delegate) {
super(request, response, delegate);
super(handshakeInfo, bufferFactory, delegate);
}
@OnWebSocketConnect
public void onWebSocketConnect(Session session) {
this.session = new JettyWebSocketSession(session, getUri(), getBufferFactory());
this.session = new JettyWebSocketSession(session, getHandshakeInfo(), getBufferFactory());
HandlerResultSubscriber subscriber = new HandlerResultSubscriber();
getDelegate().handle(this.session).subscribe(subscriber);
}
......
......@@ -17,7 +17,6 @@
package org.springframework.web.reactive.socket.adapter;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
......@@ -41,8 +40,8 @@ import org.springframework.web.reactive.socket.WebSocketSession;
public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Session> {
public JettyWebSocketSession(Session session, URI uri, DataBufferFactory bufferFactory) {
super(session, ObjectUtils.getIdentityHexString(session), uri, bufferFactory);
public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory bufferFactory) {
super(session, ObjectUtils.getIdentityHexString(session), info, bufferFactory);
}
......
......@@ -15,7 +15,6 @@
*/
package org.springframework.web.reactive.socket.adapter;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -54,8 +53,8 @@ public abstract class NettyWebSocketSessionSupport<T> extends WebSocketSessionSu
}
protected NettyWebSocketSessionSupport(T delegate, URI uri, NettyDataBufferFactory bufferFactory) {
super(delegate, ObjectUtils.getIdentityHexString(delegate), uri, bufferFactory);
protected NettyWebSocketSessionSupport(T delegate, HandshakeInfo info, NettyDataBufferFactory factory) {
super(delegate, ObjectUtils.getIdentityHexString(delegate), info, factory);
}
......
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import reactor.ipc.netty.http.websocket.WebsocketInbound;
import reactor.ipc.netty.http.websocket.WebsocketOutbound;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.reactive.socket.WebSocketHandler;
/**
* Reactor Netty {@code WebSocketHandler} implementation adapting and
* delegating to a Spring {@link WebSocketHandler}.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class ReactorNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
implements BiFunction<WebsocketInbound, WebsocketOutbound, Publisher<Void>> {
public ReactorNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler handler) {
super(request, response, handler);
}
@Override
public Publisher<Void> apply(WebsocketInbound inbound, WebsocketOutbound outbound) {
ReactorNettyWebSocketSession session =
new ReactorNettyWebSocketSession(inbound, outbound, getUri(), getBufferFactory());
return getDelegate().handle(session);
}
}
......@@ -15,8 +15,6 @@
*/
package org.springframework.web.reactive.socket.adapter;
import java.net.URI;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
......@@ -42,10 +40,10 @@ public class ReactorNettyWebSocketSession
extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> {
protected ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
URI uri, NettyDataBufferFactory factory) {
public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
HandshakeInfo handshakeInfo, NettyDataBufferFactory bufferFactory) {
super(new WebSocketConnection(inbound, outbound), uri, factory);
super(new WebSocketConnection(inbound, outbound), handshakeInfo, bufferFactory);
}
......
......@@ -16,8 +16,6 @@
package org.springframework.web.reactive.socket.adapter;
import java.net.URI;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.reactivex.netty.protocol.http.ws.WebSocketConnection;
import org.reactivestreams.Publisher;
......@@ -41,8 +39,8 @@ import org.springframework.web.reactive.socket.WebSocketSession;
public class RxNettyWebSocketSession extends NettyWebSocketSessionSupport<WebSocketConnection> {
public RxNettyWebSocketSession(WebSocketConnection conn, URI uri, NettyDataBufferFactory factory) {
super(conn, uri, factory);
public RxNettyWebSocketSession(WebSocketConnection conn, HandshakeInfo info, NettyDataBufferFactory factory) {
super(conn, info, factory);
}
......
......@@ -28,8 +28,7 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
......@@ -47,10 +46,10 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor
private TomcatWebSocketSession session;
public TomcatWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response,
public TomcatWebSocketHandlerAdapter(HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory,
WebSocketHandler delegate) {
super(request, response, delegate);
super(handshakeInfo, bufferFactory, delegate);
}
......@@ -67,8 +66,9 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor
@Override
public void onOpen(Session session, EndpointConfig config) {
TomcatWebSocketHandlerAdapter.this.session =
new TomcatWebSocketSession(session, getUri(), getBufferFactory());
TomcatWebSocketHandlerAdapter.this.session = new TomcatWebSocketSession(
session, getHandshakeInfo(), getBufferFactory());
session.addMessageHandler(String.class, message -> {
WebSocketMessage webSocketMessage = toMessage(message);
......
......@@ -17,7 +17,6 @@
package org.springframework.web.reactive.socket.adapter;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import javax.websocket.CloseReason;
......@@ -43,8 +42,8 @@ import org.springframework.web.reactive.socket.WebSocketSession;
public class TomcatWebSocketSession extends AbstractListenerWebSocketSession<Session> {
public TomcatWebSocketSession(Session session, URI uri, DataBufferFactory bufferFactory) {
super(session, session.getId(), uri, bufferFactory);
public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory bufferFactory) {
super(session, session.getId(), info, bufferFactory);
}
......
......@@ -30,8 +30,7 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
......@@ -50,16 +49,16 @@ public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupp
private UndertowWebSocketSession session;
public UndertowWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response,
public UndertowWebSocketHandlerAdapter(HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory,
WebSocketHandler delegate) {
super(request, response, delegate);
super(handshakeInfo, bufferFactory, delegate);
}
@Override
public void onConnect(WebSocketHttpExchange exchange, WebSocketChannel channel) {
this.session = new UndertowWebSocketSession(channel, getUri(), getBufferFactory());
this.session = new UndertowWebSocketSession(channel, getHandshakeInfo(), getBufferFactory());
channel.getReceiveSetter().set(new UndertowReceiveListener());
channel.resumeReceives();
......
......@@ -17,7 +17,6 @@
package org.springframework.web.reactive.socket.adapter;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
......@@ -43,8 +42,10 @@ import org.springframework.web.reactive.socket.WebSocketSession;
public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<WebSocketChannel> {
public UndertowWebSocketSession(WebSocketChannel channel, URI url, DataBufferFactory bufferFactory) {
super(channel, ObjectUtils.getIdentityHexString(channel), url, bufferFactory);
public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo handshakeInfo,
DataBufferFactory bufferFactory) {
super(channel, ObjectUtils.getIdentityHexString(channel), handshakeInfo, bufferFactory);
}
......
......@@ -15,44 +15,41 @@
*/
package org.springframework.web.reactive.socket.adapter;
import java.net.URI;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.WebSocketHandler;
/**
* Base class for {@link WebSocketHandler} adapters to underlying WebSocket
* handler APIs.
* Base class for {@link WebSocketHandler} adapters to WebSocket handler APIs
* of underlying runtimes.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public abstract class WebSocketHandlerAdapterSupport {
private final URI uri;
private final HandshakeInfo handshakeInfo;
private final WebSocketHandler delegate;
private final DataBufferFactory bufferFactory;
protected WebSocketHandlerAdapterSupport(ServerHttpRequest request, ServerHttpResponse response,
protected WebSocketHandlerAdapterSupport(HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory,
WebSocketHandler handler) {
Assert.notNull("ServerHttpRequest is required");
Assert.notNull("ServerHttpResponse is required");
Assert.notNull("WebSocketHandler handler is required");
this.uri = request.getURI();
this.bufferFactory = response.bufferFactory();
Assert.notNull(handshakeInfo, "HandshakeInfo is required.");
Assert.notNull(bufferFactory, "DataBufferFactory is required");
Assert.notNull(handler, "WebSocketHandler handler is required");
this.handshakeInfo = handshakeInfo;
this.bufferFactory = bufferFactory;
this.delegate = handler;
}
protected URI getUri() {
return this.uri;
protected HandshakeInfo getHandshakeInfo() {
return this.handshakeInfo;
}
protected WebSocketHandler getDelegate() {
......
......@@ -16,12 +16,13 @@
package org.springframework.web.reactive.socket.adapter;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
......@@ -48,23 +49,25 @@ public abstract class WebSocketSessionSupport<T> implements WebSocketSession {
private final String id;
private final URI uri;
private final HandshakeInfo handshakeInfo;
private final DataBufferFactory bufferFactory;
/**
* Create a new instance and associate the given attributes with it.
* @param delegate the underlying WebSocket connection
*/
protected WebSocketSessionSupport(T delegate, String id, URI uri, DataBufferFactory bufferFactory) {
protected WebSocketSessionSupport(T delegate, String id, HandshakeInfo handshakeInfo,
DataBufferFactory bufferFactory) {
Assert.notNull(delegate, "Native session is required.");
Assert.notNull(id, "'id' is required.");
Assert.notNull(uri, "URI is required.");
Assert.notNull(bufferFactory, "DataBufferFactory is required.");
Assert.notNull(id, "Session id is required.");
Assert.notNull(handshakeInfo, "HandshakeInfo is required.");
Assert.notNull(bufferFactory, "DataBuffer factory is required.");
this.delegate = delegate;
this.id = id;
this.uri = uri;
this.handshakeInfo = handshakeInfo;
this.bufferFactory = bufferFactory;
}
......@@ -82,8 +85,18 @@ public abstract class WebSocketSessionSupport<T> implements WebSocketSession {
}
@Override
public URI getUri() {
return this.uri;
public HandshakeInfo getHandshakeInfo() {
return this.handshakeInfo;
}
@Override
public Flux<WebSocketMessage> receive() {
return null;
}
@Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
return null;
}
@Override
......@@ -129,7 +142,7 @@ public abstract class WebSocketSessionSupport<T> implements WebSocketSession {
@Override
public String toString() {
return getClass().getSimpleName() + "[id=" + getId() + ", uri=" + getUri() + "]";
return getClass().getSimpleName() + "[id=" + getId() + ", uri=" + getHandshakeInfo().getUri() + "]";
}
}
......@@ -17,6 +17,8 @@
package org.springframework.web.reactive.socket.server.upgrade;
import java.io.IOException;
import java.net.URI;
import java.security.Principal;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
......@@ -27,12 +29,15 @@ import reactor.core.publisher.Mono;
import org.springframework.context.Lifecycle;
import org.springframework.core.NamedThreadLocal;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServletServerHttpRequest;
import org.springframework.http.server.reactive.ServletServerHttpResponse;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.HandshakeInfo;
import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
......@@ -99,13 +104,21 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(request, response, handler);
HttpServletRequest servletRequest = getHttpServletRequest(request);
HttpServletResponse servletResponse = getHttpServletResponse(response);
URI uri = request.getURI();
HttpHeaders headers = request.getHeaders();
Mono<Principal> principal = exchange.getPrincipal();
HandshakeInfo info = new HandshakeInfo(uri, headers, principal);
DataBufferFactory bufferFactory = response.bufferFactory();
JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(info, bufferFactory, handler);
startLazily(servletRequest);
boolean isUpgrade = this.factory.isUpgradeRequest(servletRequest, servletResponse);
......
......@@ -15,15 +15,20 @@
*/
package org.springframework.web.reactive.socket.server.upgrade;
import java.net.URI;
import java.security.Principal;
import java.util.List;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ReactorServerHttpRequest;
import org.springframework.http.server.reactive.ReactorServerHttpResponse;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.adapter.HandshakeInfo;
import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
......@@ -36,18 +41,23 @@ import org.springframework.web.server.ServerWebExchange;
public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler) {
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler) {
ReactorServerHttpRequest request = (ReactorServerHttpRequest) exchange.getRequest();
ReactorServerHttpResponse response = (ReactorServerHttpResponse) exchange.getResponse();
ReactorNettyWebSocketHandlerAdapter reactorHandler =
new ReactorNettyWebSocketHandlerAdapter(request, response, webSocketHandler);
URI uri = request.getURI();
HttpHeaders headers = request.getHeaders();
Mono<Principal> principal = exchange.getPrincipal();
HandshakeInfo handshakeInfo = new HandshakeInfo(uri, headers, principal);
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
String protocols = StringUtils.arrayToCommaDelimitedString(getSubProtocols(webSocketHandler));
String protocols = StringUtils.arrayToCommaDelimitedString(getSubProtocols(handler));
protocols = (StringUtils.hasText(protocols) ? protocols : null);
return response.getReactorResponse().sendWebsocket(protocols, reactorHandler);
return response.getReactorResponse().sendWebsocket(protocols,
(inbound, outbound) -> handler.handle(
new ReactorNettyWebSocketSession(inbound, outbound, handshakeInfo, bufferFactory)));
}
private static String[] getSubProtocols(WebSocketHandler webSocketHandler) {
......
......@@ -15,17 +15,22 @@
*/
package org.springframework.web.reactive.socket.server.upgrade;
import java.net.URI;
import java.security.Principal;
import java.util.List;
import java.util.Map;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.RxNettyServerHttpRequest;
import org.springframework.http.server.reactive.RxNettyServerHttpResponse;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.RxNettyWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.adapter.HandshakeInfo;
import org.springframework.web.reactive.socket.adapter.RxNettyWebSocketSession;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
......@@ -37,18 +42,25 @@ import org.springframework.web.server.ServerWebExchange;
*/
public class RxNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler) {
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler) {
RxNettyServerHttpRequest request = (RxNettyServerHttpRequest) exchange.getRequest();
RxNettyServerHttpResponse response = (RxNettyServerHttpResponse) exchange.getResponse();
RxNettyWebSocketHandlerAdapter rxNettyHandler =
new RxNettyWebSocketHandlerAdapter(request, response, webSocketHandler);
URI uri = request.getURI();
HttpHeaders headers = request.getHeaders();
Mono<Principal> principal = exchange.getPrincipal();
HandshakeInfo handshakeInfo = new HandshakeInfo(uri, headers, principal);
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
Observable<Void> completion = response.getRxNettyResponse()
.acceptWebSocketUpgrade(rxNettyHandler)
.subprotocol(getSubProtocols(webSocketHandler));
.acceptWebSocketUpgrade(conn -> {
WebSocketSession session = new RxNettyWebSocketSession(conn, handshakeInfo, bufferFactory);
return RxReactiveStreams.toObservable(handler.handle(session));
})
.subprotocol(getSubProtocols(handler));
return Mono.from(RxReactiveStreams.toPublisher(completion));
}
......
......@@ -17,9 +17,9 @@
package org.springframework.web.reactive.socket.server.upgrade;
import java.io.IOException;
import java.net.URI;
import java.security.Principal;
import java.util.Collections;
import java.util.Map;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
......@@ -28,18 +28,21 @@ import javax.websocket.Endpoint;
import javax.websocket.server.ServerEndpointConfig;
import org.apache.tomcat.websocket.server.WsServerContainer;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServletServerHttpRequest;
import org.springframework.http.server.reactive.ServletServerHttpResponse;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.HandshakeInfo;
import org.springframework.web.reactive.socket.adapter.TomcatWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
/**
* A {@link RequestUpgradeStrategy} for use with Tomcat.
*
......@@ -56,11 +59,17 @@ public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
Endpoint endpoint = new TomcatWebSocketHandlerAdapter(request, response, handler).getEndpoint();
HttpServletRequest servletRequest = getHttpServletRequest(request);
HttpServletResponse servletResponse = getHttpServletResponse(response);
URI uri = request.getURI();
HttpHeaders headers = request.getHeaders();
Mono<Principal> principal = exchange.getPrincipal();
HandshakeInfo info = new HandshakeInfo(uri, headers, principal);
DataBufferFactory bufferFactory = response.bufferFactory();
Endpoint endpoint = new TomcatWebSocketHandlerAdapter(info, bufferFactory, handler).getEndpoint();
String requestURI = servletRequest.getRequestURI();
ServerEndpointConfig config = new ServerEndpointRegistration(requestURI, endpoint);
try {
......
......@@ -16,11 +16,17 @@
package org.springframework.web.reactive.socket.server.upgrade;
import java.net.URI;
import java.security.Principal;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.UndertowServerHttpRequest;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.HandshakeInfo;
import org.springframework.web.reactive.socket.adapter.UndertowWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
......@@ -38,12 +44,21 @@ import reactor.core.publisher.Mono;
*/
public class UndertowRequestUpgradeStrategy implements RequestUpgradeStrategy {
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
WebSocketConnectionCallback callback = new UndertowWebSocketHandlerAdapter(request, response, handler);
URI uri = request.getURI();
HttpHeaders headers = request.getHeaders();
Mono<Principal> principal = exchange.getPrincipal();
HandshakeInfo info = new HandshakeInfo(uri, headers, principal);
DataBufferFactory bufferFactory = response.bufferFactory();
WebSocketConnectionCallback callback =
new UndertowWebSocketHandlerAdapter(info, bufferFactory, handler);
Assert.isTrue(request instanceof UndertowServerHttpRequest);
HttpServerExchange httpExchange = ((UndertowServerHttpRequest) request).getUndertowExchange();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册