提交 637b6387 编写于 作者: R Rossen Stoyanchev

Starting point for reactive WebSocket support

Includes basic abstractions and an RxNetty support to start.

Issue: SPR-14527
上级 8662b777
......@@ -802,6 +802,11 @@ project("spring-web-reactive") {
optional("org.freemarker:freemarker:${freemarkerVersion}")
optional "org.apache.httpcomponents:httpclient:${httpclientVersion}"
optional('org.webjars:webjars-locator:0.32')
optional("io.reactivex:rxnetty-http:${rxnettyVersion}") {
exclude group: 'io.reactivex', module: 'rxjava'
}
optional("io.reactivex:rxjava:${rxjavaVersion}")
optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}")
testCompile("io.projectreactor.addons:reactor-test:${reactorCoreVersion}")
testCompile("javax.validation:validation-api:${beanvalVersion}")
testCompile("org.hibernate:hibernate-validator:${hibval5Version}")
......@@ -810,12 +815,7 @@ project("spring-web-reactive") {
testCompile("org.eclipse.jetty:jetty-server:${jettyVersion}")
testCompile("org.eclipse.jetty:jetty-servlet:${jettyVersion}")
testCompile("io.projectreactor.ipc:reactor-netty:${reactorNettyVersion}")
testCompile("io.reactivex:rxnetty-http:${rxnettyVersion}") {
exclude group: 'io.reactivex', module: 'rxjava'
}
testCompile("io.reactivex:rxjava:${rxjavaVersion}")
testCompile "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
testCompile("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}")
testCompile("io.undertow:undertow-core:${undertowVersion}")
testCompile("org.jboss.xnio:xnio-api:${xnioVersion}")
testCompile("com.fasterxml:aalto-xml:1.0.0")
......
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/**
* Representation of WebSocket "close" status codes and reasons. Status codes
* in the 1xxx range are pre-defined by the protocol.
*
* <p>See <a href="https://tools.ietf.org/html/rfc6455#section-7.4.1">
* RFC 6455, Section 7.4.1 "Defined Status Codes"</a>.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public final class CloseStatus {
/**
* "1000 indicates a normal closure, meaning that the purpose for which the connection
* was established has been fulfilled."
*/
public static final CloseStatus NORMAL = new CloseStatus(1000);
/**
* "1001 indicates that an endpoint is "going away", such as a server going down or a
* browser having navigated away from a page."
*/
public static final CloseStatus GOING_AWAY = new CloseStatus(1001);
/**
* "1002 indicates that an endpoint is terminating the connection due to a protocol
* error."
*/
public static final CloseStatus PROTOCOL_ERROR = new CloseStatus(1002);
/**
* "1003 indicates that an endpoint is terminating the connection because it has
* received a type of data it cannot accept (e.g., an endpoint that understands only
* text data MAY send this if it receives a binary message)."
*/
public static final CloseStatus NOT_ACCEPTABLE = new CloseStatus(1003);
// 10004: Reserved.
// The specific meaning might be defined in the future.
/**
* "1005 is a reserved value and MUST NOT be set as a status code in a Close control
* frame by an endpoint. It is designated for use in applications expecting a status
* code to indicate that no status code was actually present."
*/
public static final CloseStatus NO_STATUS_CODE = new CloseStatus(1005);
/**
* "1006 is a reserved value and MUST NOT be set as a status code in a Close control
* frame by an endpoint. It is designated for use in applications expecting a status
* code to indicate that the connection was closed abnormally, e.g., without sending
* or receiving a Close control frame."
*/
public static final CloseStatus NO_CLOSE_FRAME = new CloseStatus(1006);
/**
* "1007 indicates that an endpoint is terminating the connection because it has
* received data within a message that was not consistent with the type of the message
* (e.g., non-UTF-8 [RFC3629] data within a text message)."
*/
public static final CloseStatus BAD_DATA = new CloseStatus(1007);
/**
* "1008 indicates that an endpoint is terminating the connection because it has
* received a message that violates its policy. This is a generic status code that can
* be returned when there is no other more suitable status code (e.g., 1003 or 1009)
* or if there is a need to hide specific details about the policy."
*/
public static final CloseStatus POLICY_VIOLATION = new CloseStatus(1008);
/**
* "1009 indicates that an endpoint is terminating the connection because it has
* received a message that is too big for it to process."
*/
public static final CloseStatus TOO_BIG_TO_PROCESS = new CloseStatus(1009);
/**
* "1010 indicates that an endpoint (client) is terminating the connection because it
* has expected the server to negotiate one or more extension, but the server didn't
* return them in the response message of the WebSocket handshake. The list of
* extensions that are needed SHOULD appear in the /reason/ part of the Close frame.
* Note that this status code is not used by the server, because it can fail the
* WebSocket handshake instead."
*/
public static final CloseStatus REQUIRED_EXTENSION = new CloseStatus(1010);
/**
* "1011 indicates that a server is terminating the connection because it encountered
* an unexpected condition that prevented it from fulfilling the request."
*/
public static final CloseStatus SERVER_ERROR = new CloseStatus(1011);
/**
* "1012 indicates that the service is restarted. A client may reconnect, and if it
* chooses to do, should reconnect using a randomized delay of 5 - 30s."
*/
public static final CloseStatus SERVICE_RESTARTED = new CloseStatus(1012);
/**
* "1013 indicates that the service is experiencing overload. A client should only
* connect to a different IP (when there are multiple for the target) or reconnect to
* the same IP upon user action."
*/
public static final CloseStatus SERVICE_OVERLOAD = new CloseStatus(1013);
/**
* "1015 is a reserved value and MUST NOT be set as a status code in a Close control
* frame by an endpoint. It is designated for use in applications expecting a status
* code to indicate that the connection was closed due to a failure to perform a TLS
* handshake (e.g., the server certificate can't be verified)."
*/
public static final CloseStatus TLS_HANDSHAKE_FAILURE = new CloseStatus(1015);
private final int code;
private final String reason;
/**
* Create a new {@link CloseStatus} instance.
* @param code the status code
*/
public CloseStatus(int code) {
this(code, null);
}
/**
* Create a new {@link CloseStatus} instance.
* @param code the status code
* @param reason the reason
*/
public CloseStatus(int code, String reason) {
Assert.isTrue((code >= 1000 && code < 5000), "Invalid status code");
this.code = code;
this.reason = reason;
}
/**
* Return the status code.
*/
public int getCode() {
return this.code;
}
/**
* Return the reason, or {@code null} if none.
*/
public String getReason() {
return this.reason;
}
/**
* Create a new {@link CloseStatus} from this one with the specified reason.
* @param reason the reason
* @return a new {@link CloseStatus} instance
*/
public CloseStatus withReason(String reason) {
Assert.hasText(reason, "Reason must not be empty");
return new CloseStatus(this.code, reason);
}
public boolean equalsCode(CloseStatus other) {
return (this.code == other.code);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof CloseStatus)) {
return false;
}
CloseStatus otherStatus = (CloseStatus) other;
return (this.code == otherStatus.code &&
ObjectUtils.nullSafeEquals(this.reason, otherStatus.reason));
}
@Override
public int hashCode() {
return this.code * 29 + ObjectUtils.nullSafeHashCode(this.reason);
}
@Override
public String toString() {
return "CloseStatus[code=" + this.code + ", reason=" + this.reason + "]";
}
}
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket;
import java.util.Collections;
import java.util.List;
import reactor.core.publisher.Mono;
/**
* Handler for a WebSocket-style session interaction.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public interface WebSocketHandler {
/**
* Return the list of sub-protocols supported by this handler.
* <p>By default an empty list is returned.
*/
default List<String> getSubProtocols() {
return Collections.emptyList();
}
/**
* Handle the given WebSocket session.
* @param session the session
* @return signals completion for session handling
*/
Mono<Void> handle(WebSocketSession session);
}
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/**
* Representation of a WebSocket message.
* Use one of the static factory methods in this class to create a message.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class WebSocketMessage {
private final Type type;
private final DataBuffer payload;
/**
* Private constructor. See static factory methods.
*/
private WebSocketMessage(Type type, DataBuffer payload) {
Assert.notNull(type, "'type' must not be null");
Assert.notNull(payload, "'payload' must not be null");
this.type = type;
this.payload = payload;
}
/**
* Return the message type (text, binary, etc).
*/
public Type getType() {
return this.type;
}
/**
* Return the message payload.
*/
public DataBuffer getPayload() {
return this.payload;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof WebSocketMessage)) {
return false;
}
WebSocketMessage otherMessage = (WebSocketMessage) other;
return (this.type.equals(otherMessage.type) &&
ObjectUtils.nullSafeEquals(this.payload, otherMessage.payload));
}
@Override
public int hashCode() {
return this.type.hashCode() * 29 + this.payload.hashCode();
}
/**
* Factory method to create a text WebSocket message.
*/
public static WebSocketMessage text(DataBuffer payload) {
return create(Type.TEXT, payload);
}
/**
* Factory method to create a binary WebSocket message.
*/
public static WebSocketMessage binary(DataBuffer payload) {
return create(Type.BINARY, payload);
}
/**
* Factory method to create a ping WebSocket message.
*/
public static WebSocketMessage ping(DataBuffer payload) {
return create(Type.PING, payload);
}
/**
* Factory method to create a pong WebSocket message.
*/
public static WebSocketMessage pong(DataBuffer payload) {
return create(Type.PONG, payload);
}
/**
* Factory method to create a WebSocket message of the given type.
*/
public static WebSocketMessage create(Type type, DataBuffer payload) {
return new WebSocketMessage(type, payload);
}
/**
* WebSocket message types.
*/
public enum Type { TEXT, BINARY, PING, PONG }
}
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket;
import java.net.URI;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
/**
* Representation for a WebSocket session.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public interface WebSocketSession {
/**
* Return the id for the session.
*/
String getId();
/**
* Return the WebSocket endpoint URI.
*/
URI getUri();
/**
* Get the flux of incoming messages.
* <p><strong>Note:</strong> the caller of this method is responsible for
* releasing the DataBuffer payload of each message after consuming it
* on runtimes where a {@code PooledByteBuffer} is used such as Netty.
* @see org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer)
*/
Flux<WebSocketMessage> receive();
/**
* Write the given messages to the WebSocket connection.
* @param messages the messages to write
*/
Mono<Void> send(Publisher<WebSocketMessage> messages);
/**
* Close the WebSocket session with {@link CloseStatus#NORMAL}.
*/
default Mono<Void> close() {
return close(CloseStatus.NORMAL);
}
/**
* Close the WebSocket session with the given status.
* @param status the close status
*/
Mono<Void> close(CloseStatus status);
}
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.net.URI;
import io.reactivex.netty.protocol.http.ws.WebSocketConnection;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.WebSocketHandler;
/**
* RxNetty {@code WebSocketHandler} implementation adapting and delegating to a
* Spring {@link WebSocketHandler}.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class RxNettyWebSocketHandlerAdapter
implements io.reactivex.netty.protocol.http.ws.server.WebSocketHandler {
private final URI uri;
private final NettyDataBufferFactory bufferFactory;
private final WebSocketHandler handler;
public RxNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler handler) {
Assert.notNull("'request' is required");
Assert.notNull("'response' is required");
Assert.notNull("'handler' handler is required");
this.uri = request.getURI();
this.bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
this.handler = handler;
}
@Override
public Observable<Void> handle(WebSocketConnection connection) {
Mono<Void> result = this.handler.handle(createSession(connection));
return RxReactiveStreams.toObservable(result);
}
private RxNettyWebSocketSession createSession(WebSocketConnection conn) {
return new RxNettyWebSocketSession(conn, this.uri, this.bufferFactory);
}
}
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.reactivex.netty.protocol.http.ws.WebSocketConnection;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
/**
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class RxNettyWebSocketSession extends WebSocketSessionSupport<WebSocketConnection> {
private static final Map<Class<?>, WebSocketMessage.Type> MESSAGE_TYPES;
static {
MESSAGE_TYPES = new HashMap<>(4);
MESSAGE_TYPES.put(TextWebSocketFrame.class, WebSocketMessage.Type.TEXT);
MESSAGE_TYPES.put(BinaryWebSocketFrame.class, WebSocketMessage.Type.BINARY);
MESSAGE_TYPES.put(PingWebSocketFrame.class, WebSocketMessage.Type.PING);
MESSAGE_TYPES.put(PongWebSocketFrame.class, WebSocketMessage.Type.PONG);
}
private final String id;
private final URI uri;
private final NettyDataBufferFactory bufferFactory;
public RxNettyWebSocketSession(WebSocketConnection conn, URI uri, NettyDataBufferFactory factory) {
super(conn);
Assert.notNull(uri, "'uri' is required.");
Assert.notNull(uri, "'bufferFactory' is required.");
this.id = ObjectUtils.getIdentityHexString(getDelegate());
this.uri = uri;
this.bufferFactory = factory;
}
@Override
public String getId() {
return this.id;
}
@Override
public URI getUri() {
return this.uri;
}
@Override
public Flux<WebSocketMessage> receive() {
return Flux.from(RxReactiveStreams.toPublisher(getDelegate().getInput()))
.filter(frame -> !(frame instanceof CloseWebSocketFrame))
.window()
.concatMap(flux -> flux.takeUntil(WebSocketFrame::isFinalFragment).buffer())
.map(this::toMessage);
}
@SuppressWarnings("OptionalGetWithoutIsPresent")
private WebSocketMessage toMessage(List<WebSocketFrame> frames) {
Class<?> frameType = frames.get(0).getClass();
if (frames.size() == 1) {
NettyDataBuffer buffer = this.bufferFactory.wrap(frames.get(0).content());
return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer);
}
return frames.stream()
.map(socketFrame -> bufferFactory.wrap(socketFrame.content()))
.reduce(NettyDataBuffer::write)
.map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer))
.get();
}
@Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
Observable<WebSocketFrame> frames = RxReactiveStreams.toObservable(messages).map(this::toFrame);
Observable<Void> completion = getDelegate().write(frames);
return Mono.from(RxReactiveStreams.toPublisher(completion));
}
private WebSocketFrame toFrame(WebSocketMessage message) {
ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(message.getPayload());
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
return new TextWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
return new BinaryWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
return new PingWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
return new PongWebSocketFrame(byteBuf);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
}
@Override
protected Mono<Void> closeInternal(CloseStatus status) {
return Mono.from(RxReactiveStreams.toPublisher(getDelegate().close()));
}
}
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Base class for {@link WebSocketSession} implementations wrapping and
* delegating to the native WebSocket session (or connection) of the underlying
* WebSocket runtime.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public abstract class WebSocketSessionSupport<T> implements WebSocketSession {
protected final Log logger = LogFactory.getLog(getClass());
private final T delegate;
/**
* Create a new instance and associate the given attributes with it.
* @param delegate the underlying WebSocket connection
*/
protected WebSocketSessionSupport(T delegate) {
Assert.notNull(delegate, "'delegate' session is required.");
this.delegate = delegate;
}
/**
* Return the native session of the underlying runtime.
*/
public T getDelegate() {
return this.delegate;
}
@Override
public final Mono<Void> close(CloseStatus status) {
if (logger.isDebugEnabled()) {
logger.debug("Closing " + this);
}
return closeInternal(status);
}
protected abstract Mono<Void> closeInternal(CloseStatus status);
@Override
public String toString() {
return getClass().getSimpleName() + "[id=" + getId() + ", uri=" + getUri() + "]";
}
}
/**
* Classes adapting Spring's Reactive WebSocket API to and from WebSocket runtimes.
*/
package org.springframework.web.reactive.socket.adapter;
/**
* Abstractions and support classes for WebSocket interactions.
*/
package org.springframework.web.reactive.socket;
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server;
import java.util.Map;
import reactor.core.publisher.Mono;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.server.ServerWebExchange;
/**
* A strategy for upgrading an HTTP request to a WebSocket interaction depending
* on the underlying HTTP runtime.
*
* <p>Typically there is one such strategy for every {@link ServerHttpRequest}
* and {@link ServerHttpResponse} implementation type except in the case of
* Servlet containers for which there is no standard API to upgrade a request.
* JSR-356 does have programmatic endpoint registration but that is only
* intended for use on startup and not per request.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public interface RequestUpgradeStrategy {
/**
* Upgrade the request to a WebSocket interaction and adapt the given
* Spring {@link WebSocketHandler} to the underlying runtime WebSocket API.
* @param exchange the current exchange
* @param webSocketHandler handler for WebSocket session
* @return a completion Mono for the WebSocket session handling
*/
Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler);
}
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server;
import reactor.core.publisher.Mono;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
import org.springframework.web.server.ServerWebExchange;
/**
* A service to delegate WebSocket-related HTTP requests to.
*
* <p>For a straight-up WebSocket endpoint this means handling the initial
* handshake request but for a SockJS endpoint this means handling all HTTP
* requests defined in the SockJS protocol.
*
* @author Rossen Stoyanchev
* @since 5.0
* @see HandshakeWebSocketService
*/
public interface WebSocketService {
/**
* Handle the HTTP request and use the given {@link WebSocketHandler}.
* @param exchange the current exchange
* @param webSocketHandler handler for WebSocket session
* @return a completion Mono for the WebSocket session handling
*/
Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler webSocketHandler);
}
/**
* Server support for WebSocket interactions.
*/
package org.springframework.web.reactive.socket.server;
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server.support;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.WebSocketService;
import org.springframework.web.server.MethodNotAllowedException;
import org.springframework.web.server.ServerWebExchange;
/**
* A {@code WebSocketService} implementation that handles a WebSocket handshake
* and upgrades to a WebSocket interaction through the configured or
* auto-detected {@link RequestUpgradeStrategy}.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class HandshakeWebSocketService implements WebSocketService {
private static final String SEC_WEBSOCKET_KEY = "Sec-WebSocket-Key";
private static final boolean rxNettyPresent = ClassUtils.isPresent(
"io.reactivex.netty.protocol.http.ws.WebSocketConnection",
HandshakeWebSocketService.class.getClassLoader());
protected static final Log logger = LogFactory.getLog(HandshakeWebSocketService.class);
private final RequestUpgradeStrategy upgradeStrategy;
/**
* Default constructor automatic, classpath detection based discovery of the
* {@link RequestUpgradeStrategy} to use.
*/
public HandshakeWebSocketService() {
this(initUpgradeStrategy());
}
/**
* Alternative constructor with the {@link RequestUpgradeStrategy} to use.
* @param upgradeStrategy the strategy to use
*/
public HandshakeWebSocketService(RequestUpgradeStrategy upgradeStrategy) {
Assert.notNull(upgradeStrategy, "'upgradeStrategy' is required");
this.upgradeStrategy = upgradeStrategy;
}
private static RequestUpgradeStrategy initUpgradeStrategy() {
String className;
if (rxNettyPresent) {
className = "RxNettyRequestUpgradeStrategy";
}
else {
throw new IllegalStateException("No suitable default RequestUpgradeStrategy found");
}
try {
className = HandshakeWebSocketService.class.getPackage().getName() + "." + className;
Class<?> clazz = ClassUtils.forName(className, HandshakeWebSocketService.class.getClassLoader());
return (RequestUpgradeStrategy) ReflectionUtils.accessibleConstructor(clazz).newInstance();
}
catch (Throwable ex) {
throw new IllegalStateException(
"Failed to instantiate RequestUpgradeStrategy: " + className, ex);
}
}
/**
* Return the {@link RequestUpgradeStrategy} for WebSocket requests.
*/
public RequestUpgradeStrategy getUpgradeStrategy() {
return this.upgradeStrategy;
}
@Override
public Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler webSocketHandler) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
if (logger.isTraceEnabled()) {
logger.trace("Processing " + request.getMethod() + " " + request.getURI());
}
if (HttpMethod.GET != request.getMethod()) {
return Mono.error(new MethodNotAllowedException(
request.getMethod().name(), Collections.singleton("GET")));
}
if (!isWebSocketUpgrade(request)) {
response.setStatusCode(HttpStatus.BAD_REQUEST);
return response.setComplete();
}
return getUpgradeStrategy().upgrade(exchange, webSocketHandler);
}
private boolean isWebSocketUpgrade(ServerHttpRequest request) {
if (!"WebSocket".equalsIgnoreCase(request.getHeaders().getUpgrade())) {
if (logger.isErrorEnabled()) {
logger.error("Invalid 'Upgrade' header: " + request.getHeaders());
}
return false;
}
List<String> connectionValue = request.getHeaders().getConnection();
if (!connectionValue.contains("Upgrade") && !connectionValue.contains("upgrade")) {
if (logger.isErrorEnabled()) {
logger.error("Invalid 'Connection' header: " + request.getHeaders());
}
return false;
}
String key = request.getHeaders().getFirst(SEC_WEBSOCKET_KEY);
if (key == null) {
if (logger.isErrorEnabled()) {
logger.error("Missing \"Sec-WebSocket-Key\" header");
}
return false;
}
return true;
}
}
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server.support;
import reactor.core.publisher.Mono;
import org.springframework.util.Assert;
import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.reactive.HandlerAdapter;
import org.springframework.web.reactive.HandlerResult;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.WebSocketService;
import org.springframework.web.server.ServerWebExchange;
/**
* {@code HandlerAdapter} that allows using a {@link WebSocketHandler} contract
* with the generic {@link DispatcherHandler} mapping URLs directly to such
* handlers. Requests are handled through the configured {@link WebSocketService}.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class WebSocketHandlerAdapter implements HandlerAdapter {
private final WebSocketService webSocketService;
/**
* Default constructor that creates and uses a
* {@link HandshakeWebSocketService} for a straight-up WebSocket interaction,
* i.e. treating incoming requests as WebSocket handshake requests.
*/
public WebSocketHandlerAdapter() {
this(new HandshakeWebSocketService());
}
/**
* Alternative constructor with the {@link WebSocketService} to use.
*/
public WebSocketHandlerAdapter(WebSocketService webSocketService) {
Assert.notNull(webSocketService, "'webSocketService' is required");
this.webSocketService = webSocketService;
}
public WebSocketService getWebSocketService() {
return this.webSocketService;
}
@Override
public boolean supports(Object handler) {
return WebSocketHandler.class.isAssignableFrom(handler.getClass());
}
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
WebSocketHandler webSocketHandler = (WebSocketHandler) handler;
return getWebSocketService().handleRequest(exchange, webSocketHandler).then(Mono.empty());
}
}
/**
* Server-side support classes for WebSocket requests.
*/
package org.springframework.web.reactive.socket.server.support;
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server.upgrade;
import java.util.List;
import java.util.Map;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.http.server.reactive.RxNettyServerHttpRequest;
import org.springframework.http.server.reactive.RxNettyServerHttpResponse;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.RxNettyWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
/**
* A {@link RequestUpgradeStrategy} for use with RxNetty.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class RxNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler) {
RxNettyServerHttpRequest request = (RxNettyServerHttpRequest) exchange.getRequest();
RxNettyServerHttpResponse response = (RxNettyServerHttpResponse) exchange.getResponse();
RxNettyWebSocketHandlerAdapter rxNettyHandler =
new RxNettyWebSocketHandlerAdapter(request, response, webSocketHandler);
Observable<Void> completion = response.getRxNettyResponse()
.acceptWebSocketUpgrade(rxNettyHandler)
.subprotocol(getSubProtocols(webSocketHandler));
return Mono.from(RxReactiveStreams.toPublisher(completion));
}
private static String[] getSubProtocols(WebSocketHandler webSocketHandler) {
List<String> subProtocols = webSocketHandler.getSubProtocols();
return subProtocols.toArray(new String[subProtocols.size()]);
}
}
/**
* Holds implementations of
* {@link org.springframework.web.reactive.socket.server.RequestUpgradeStrategy}.
*/
package org.springframework.web.reactive.socket.server.upgrade;
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.bootstrap.HttpServer;
import org.springframework.http.server.reactive.bootstrap.RxNettyHttpServer;
import org.springframework.util.SocketUtils;
import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.upgrade.RxNettyRequestUpgradeStrategy;
/**
* Base class for WebSocket integration tests involving a server-side
* {@code WebSocketHandler}. Sub-classes to return a Spring configuration class
* via {@link #getWebConfigClass()} containing a SimpleUrlHandlerMapping with
* pattern-to-WebSocketHandler mappings.
*
* @author Rossen Stoyanchev
*/
@RunWith(Parameterized.class)
@SuppressWarnings({"unused", "WeakerAccess"})
public abstract class AbstractWebSocketHandlerIntegrationTests {
protected int port;
@Parameter(0)
public HttpServer server;
@Parameter(1)
public Class<?> handlerAdapterConfigClass;
@Parameters
public static Object[][] arguments() {
return new Object[][] {
{new RxNettyHttpServer(), RxNettyConfig.class}
};
}
@Before
public void setup() throws Exception {
this.port = SocketUtils.findAvailableTcpPort();
this.server.setPort(this.port);
this.server.setHandler(createHttpHandler());
this.server.afterPropertiesSet();
this.server.start();
}
private HttpHandler createHttpHandler() {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(DispatcherConfig.class, this.handlerAdapterConfigClass);
context.register(getWebConfigClass());
context.refresh();
return DispatcherHandler.toHttpHandler(context);
}
protected abstract Class<?> getWebConfigClass();
@After
public void tearDown() throws Exception {
this.server.stop();
}
@Configuration
static class DispatcherConfig {
@Bean
public DispatcherHandler webHandler() {
return new DispatcherHandler();
}
}
static abstract class AbstractHandlerAdapterConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
RequestUpgradeStrategy strategy = createUpgradeStrategy();
WebSocketService service = new HandshakeWebSocketService(strategy);
return new WebSocketHandlerAdapter(service);
}
protected abstract RequestUpgradeStrategy createUpgradeStrategy();
}
@Configuration
static class RxNettyConfig extends AbstractHandlerAdapterConfig {
@Override
protected RequestUpgradeStrategy createUpgradeStrategy() {
return new RxNettyRequestUpgradeStrategy();
}
}
}
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.ws.client.WebSocketResponse;
import org.junit.Test;
import reactor.core.publisher.Mono;
import rx.Observable;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import static org.junit.Assert.assertEquals;
/**
* Basic WebSocket integration
* @author Rossen Stoyanchev
*/
@SuppressWarnings({"unused", "WeakerAccess"})
public class BasicWebSocketHandlerIntegrationTests extends AbstractWebSocketHandlerIntegrationTests {
@Override
protected Class<?> getWebConfigClass() {
return WebConfig.class;
}
@Test
public void echo() throws Exception {
Observable<String> messages = Observable.range(1, 10).map(i -> "Interval " + i);
List<String> actual = HttpClient.newClient("localhost", this.port)
.createGet("/echo")
.requestWebSocketUpgrade()
.flatMap(WebSocketResponse::getWebSocketConnection)
.flatMap(conn -> conn.write(messages
.map(TextWebSocketFrame::new)
.cast(WebSocketFrame.class)
.concatWith(Observable.just(new CloseWebSocketFrame())))
.cast(WebSocketFrame.class)
.mergeWith(conn.getInput())
)
.take(10)
.map(frame -> frame.content().toString(StandardCharsets.UTF_8))
.toList().toBlocking().first();
List<String> expected = messages.toList().toBlocking().first();
assertEquals(expected, actual);
}
@Configuration
static class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/echo", new EchoWebSocketHandler());
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
return mapping;
}
}
private static class EchoWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(session.receive());
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册