From 637b6387ea3d9cfba01a6267e4ea5cfd033f9148 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 16 Nov 2016 13:55:04 -0500 Subject: [PATCH] Starting point for reactive WebSocket support Includes basic abstractions and an RxNetty support to start. Issue: SPR-14527 --- build.gradle | 10 +- .../web/reactive/socket/CloseStatus.java | 213 ++++++++++++++++++ .../web/reactive/socket/WebSocketHandler.java | 46 ++++ .../web/reactive/socket/WebSocketMessage.java | 122 ++++++++++ .../web/reactive/socket/WebSocketSession.java | 72 ++++++ .../RxNettyWebSocketHandlerAdapter.java | 71 ++++++ .../adapter/RxNettyWebSocketSession.java | 143 ++++++++++++ .../adapter/WebSocketSessionSupport.java | 77 +++++++ .../reactive/socket/adapter/package-info.java | 4 + .../web/reactive/socket/package-info.java | 4 + .../socket/server/RequestUpgradeStrategy.java | 51 +++++ .../socket/server/WebSocketService.java | 45 ++++ .../reactive/socket/server/package-info.java | 4 + .../support/HandshakeWebSocketService.java | 155 +++++++++++++ .../support/WebSocketHandlerAdapter.java | 75 ++++++ .../socket/server/support/package-info.java | 4 + .../RxNettyRequestUpgradeStrategy.java | 61 +++++ .../socket/server/upgrade/package-info.java | 5 + ...tractWebSocketHandlerIntegrationTests.java | 122 ++++++++++ ...BasicWebSocketHandlerIntegrationTests.java | 101 +++++++++ 20 files changed, 1380 insertions(+), 5 deletions(-) create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/CloseStatus.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/package-info.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/package-info.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/RequestUpgradeStrategy.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/WebSocketService.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/package-info.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/WebSocketHandlerAdapter.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/package-info.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/RxNettyRequestUpgradeStrategy.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/package-info.java create mode 100644 spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java create mode 100644 spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/BasicWebSocketHandlerIntegrationTests.java diff --git a/build.gradle b/build.gradle index daa83c9753..d35a0dd1a0 100644 --- a/build.gradle +++ b/build.gradle @@ -802,6 +802,11 @@ project("spring-web-reactive") { optional("org.freemarker:freemarker:${freemarkerVersion}") optional "org.apache.httpcomponents:httpclient:${httpclientVersion}" optional('org.webjars:webjars-locator:0.32') + optional("io.reactivex:rxnetty-http:${rxnettyVersion}") { + exclude group: 'io.reactivex', module: 'rxjava' + } + optional("io.reactivex:rxjava:${rxjavaVersion}") + optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}") testCompile("io.projectreactor.addons:reactor-test:${reactorCoreVersion}") testCompile("javax.validation:validation-api:${beanvalVersion}") testCompile("org.hibernate:hibernate-validator:${hibval5Version}") @@ -810,12 +815,7 @@ project("spring-web-reactive") { testCompile("org.eclipse.jetty:jetty-server:${jettyVersion}") testCompile("org.eclipse.jetty:jetty-servlet:${jettyVersion}") testCompile("io.projectreactor.ipc:reactor-netty:${reactorNettyVersion}") - testCompile("io.reactivex:rxnetty-http:${rxnettyVersion}") { - exclude group: 'io.reactivex', module: 'rxjava' - } - testCompile("io.reactivex:rxjava:${rxjavaVersion}") testCompile "io.reactivex.rxjava2:rxjava:${rxjava2Version}" - testCompile("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}") testCompile("io.undertow:undertow-core:${undertowVersion}") testCompile("org.jboss.xnio:xnio-api:${xnioVersion}") testCompile("com.fasterxml:aalto-xml:1.0.0") diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/CloseStatus.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/CloseStatus.java new file mode 100644 index 0000000000..edc515e2e3 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/CloseStatus.java @@ -0,0 +1,213 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket; + +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; + +/** + * Representation of WebSocket "close" status codes and reasons. Status codes + * in the 1xxx range are pre-defined by the protocol. + * + *

See + * RFC 6455, Section 7.4.1 "Defined Status Codes". + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public final class CloseStatus { + + /** + * "1000 indicates a normal closure, meaning that the purpose for which the connection + * was established has been fulfilled." + */ + public static final CloseStatus NORMAL = new CloseStatus(1000); + + /** + * "1001 indicates that an endpoint is "going away", such as a server going down or a + * browser having navigated away from a page." + */ + public static final CloseStatus GOING_AWAY = new CloseStatus(1001); + + /** + * "1002 indicates that an endpoint is terminating the connection due to a protocol + * error." + */ + public static final CloseStatus PROTOCOL_ERROR = new CloseStatus(1002); + + /** + * "1003 indicates that an endpoint is terminating the connection because it has + * received a type of data it cannot accept (e.g., an endpoint that understands only + * text data MAY send this if it receives a binary message)." + */ + public static final CloseStatus NOT_ACCEPTABLE = new CloseStatus(1003); + + // 10004: Reserved. + // The specific meaning might be defined in the future. + + /** + * "1005 is a reserved value and MUST NOT be set as a status code in a Close control + * frame by an endpoint. It is designated for use in applications expecting a status + * code to indicate that no status code was actually present." + */ + public static final CloseStatus NO_STATUS_CODE = new CloseStatus(1005); + + /** + * "1006 is a reserved value and MUST NOT be set as a status code in a Close control + * frame by an endpoint. It is designated for use in applications expecting a status + * code to indicate that the connection was closed abnormally, e.g., without sending + * or receiving a Close control frame." + */ + public static final CloseStatus NO_CLOSE_FRAME = new CloseStatus(1006); + + /** + * "1007 indicates that an endpoint is terminating the connection because it has + * received data within a message that was not consistent with the type of the message + * (e.g., non-UTF-8 [RFC3629] data within a text message)." + */ + public static final CloseStatus BAD_DATA = new CloseStatus(1007); + + /** + * "1008 indicates that an endpoint is terminating the connection because it has + * received a message that violates its policy. This is a generic status code that can + * be returned when there is no other more suitable status code (e.g., 1003 or 1009) + * or if there is a need to hide specific details about the policy." + */ + public static final CloseStatus POLICY_VIOLATION = new CloseStatus(1008); + + /** + * "1009 indicates that an endpoint is terminating the connection because it has + * received a message that is too big for it to process." + */ + public static final CloseStatus TOO_BIG_TO_PROCESS = new CloseStatus(1009); + + /** + * "1010 indicates that an endpoint (client) is terminating the connection because it + * has expected the server to negotiate one or more extension, but the server didn't + * return them in the response message of the WebSocket handshake. The list of + * extensions that are needed SHOULD appear in the /reason/ part of the Close frame. + * Note that this status code is not used by the server, because it can fail the + * WebSocket handshake instead." + */ + public static final CloseStatus REQUIRED_EXTENSION = new CloseStatus(1010); + + /** + * "1011 indicates that a server is terminating the connection because it encountered + * an unexpected condition that prevented it from fulfilling the request." + */ + public static final CloseStatus SERVER_ERROR = new CloseStatus(1011); + + /** + * "1012 indicates that the service is restarted. A client may reconnect, and if it + * chooses to do, should reconnect using a randomized delay of 5 - 30s." + */ + public static final CloseStatus SERVICE_RESTARTED = new CloseStatus(1012); + + /** + * "1013 indicates that the service is experiencing overload. A client should only + * connect to a different IP (when there are multiple for the target) or reconnect to + * the same IP upon user action." + */ + public static final CloseStatus SERVICE_OVERLOAD = new CloseStatus(1013); + + /** + * "1015 is a reserved value and MUST NOT be set as a status code in a Close control + * frame by an endpoint. It is designated for use in applications expecting a status + * code to indicate that the connection was closed due to a failure to perform a TLS + * handshake (e.g., the server certificate can't be verified)." + */ + public static final CloseStatus TLS_HANDSHAKE_FAILURE = new CloseStatus(1015); + + + private final int code; + + private final String reason; + + + /** + * Create a new {@link CloseStatus} instance. + * @param code the status code + */ + public CloseStatus(int code) { + this(code, null); + } + + /** + * Create a new {@link CloseStatus} instance. + * @param code the status code + * @param reason the reason + */ + public CloseStatus(int code, String reason) { + Assert.isTrue((code >= 1000 && code < 5000), "Invalid status code"); + this.code = code; + this.reason = reason; + } + + + /** + * Return the status code. + */ + public int getCode() { + return this.code; + } + + /** + * Return the reason, or {@code null} if none. + */ + public String getReason() { + return this.reason; + } + + /** + * Create a new {@link CloseStatus} from this one with the specified reason. + * @param reason the reason + * @return a new {@link CloseStatus} instance + */ + public CloseStatus withReason(String reason) { + Assert.hasText(reason, "Reason must not be empty"); + return new CloseStatus(this.code, reason); + } + + + public boolean equalsCode(CloseStatus other) { + return (this.code == other.code); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof CloseStatus)) { + return false; + } + CloseStatus otherStatus = (CloseStatus) other; + return (this.code == otherStatus.code && + ObjectUtils.nullSafeEquals(this.reason, otherStatus.reason)); + } + + @Override + public int hashCode() { + return this.code * 29 + ObjectUtils.nullSafeHashCode(this.reason); + } + + @Override + public String toString() { + return "CloseStatus[code=" + this.code + ", reason=" + this.reason + "]"; + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java new file mode 100644 index 0000000000..ce4713e3b2 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java @@ -0,0 +1,46 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket; + +import java.util.Collections; +import java.util.List; + +import reactor.core.publisher.Mono; + +/** + * Handler for a WebSocket-style session interaction. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public interface WebSocketHandler { + + /** + * Return the list of sub-protocols supported by this handler. + *

By default an empty list is returned. + */ + default List getSubProtocols() { + return Collections.emptyList(); + } + + /** + * Handle the given WebSocket session. + * @param session the session + * @return signals completion for session handling + */ + Mono handle(WebSocketSession session); + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java new file mode 100644 index 0000000000..66afe78b31 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java @@ -0,0 +1,122 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; + +/** + * Representation of a WebSocket message. + * Use one of the static factory methods in this class to create a message. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public class WebSocketMessage { + + private final Type type; + + private final DataBuffer payload; + + + /** + * Private constructor. See static factory methods. + */ + private WebSocketMessage(Type type, DataBuffer payload) { + Assert.notNull(type, "'type' must not be null"); + Assert.notNull(payload, "'payload' must not be null"); + this.type = type; + this.payload = payload; + } + + + /** + * Return the message type (text, binary, etc). + */ + public Type getType() { + return this.type; + } + + /** + * Return the message payload. + */ + public DataBuffer getPayload() { + return this.payload; + } + + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof WebSocketMessage)) { + return false; + } + WebSocketMessage otherMessage = (WebSocketMessage) other; + return (this.type.equals(otherMessage.type) && + ObjectUtils.nullSafeEquals(this.payload, otherMessage.payload)); + } + + @Override + public int hashCode() { + return this.type.hashCode() * 29 + this.payload.hashCode(); + } + + + /** + * Factory method to create a text WebSocket message. + */ + public static WebSocketMessage text(DataBuffer payload) { + return create(Type.TEXT, payload); + } + + /** + * Factory method to create a binary WebSocket message. + */ + public static WebSocketMessage binary(DataBuffer payload) { + return create(Type.BINARY, payload); + } + + /** + * Factory method to create a ping WebSocket message. + */ + public static WebSocketMessage ping(DataBuffer payload) { + return create(Type.PING, payload); + } + + /** + * Factory method to create a pong WebSocket message. + */ + public static WebSocketMessage pong(DataBuffer payload) { + return create(Type.PONG, payload); + } + + /** + * Factory method to create a WebSocket message of the given type. + */ + public static WebSocketMessage create(Type type, DataBuffer payload) { + return new WebSocketMessage(type, payload); + } + + + /** + * WebSocket message types. + */ + public enum Type { TEXT, BINARY, PING, PONG } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java new file mode 100644 index 0000000000..f094edafa8 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java @@ -0,0 +1,72 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket; + +import java.net.URI; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.io.buffer.DataBuffer; + +/** + * Representation for a WebSocket session. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public interface WebSocketSession { + + /** + * Return the id for the session. + */ + String getId(); + + /** + * Return the WebSocket endpoint URI. + */ + URI getUri(); + + /** + * Get the flux of incoming messages. + *

Note: the caller of this method is responsible for + * releasing the DataBuffer payload of each message after consuming it + * on runtimes where a {@code PooledByteBuffer} is used such as Netty. + * @see org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) + */ + Flux receive(); + + /** + * Write the given messages to the WebSocket connection. + * @param messages the messages to write + */ + Mono send(Publisher messages); + + /** + * Close the WebSocket session with {@link CloseStatus#NORMAL}. + */ + default Mono close() { + return close(CloseStatus.NORMAL); + } + + /** + * Close the WebSocket session with the given status. + * @param status the close status + */ + Mono close(CloseStatus status); + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java new file mode 100644 index 0000000000..4c727575e8 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java @@ -0,0 +1,71 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.adapter; + +import java.net.URI; + +import io.reactivex.netty.protocol.http.ws.WebSocketConnection; +import reactor.core.publisher.Mono; +import rx.Observable; +import rx.RxReactiveStreams; + +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.util.Assert; +import org.springframework.web.reactive.socket.WebSocketHandler; + +/** + * RxNetty {@code WebSocketHandler} implementation adapting and delegating to a + * Spring {@link WebSocketHandler}. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public class RxNettyWebSocketHandlerAdapter + implements io.reactivex.netty.protocol.http.ws.server.WebSocketHandler { + + private final URI uri; + + private final NettyDataBufferFactory bufferFactory; + + private final WebSocketHandler handler; + + + public RxNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response, + WebSocketHandler handler) { + + Assert.notNull("'request' is required"); + Assert.notNull("'response' is required"); + Assert.notNull("'handler' handler is required"); + + this.uri = request.getURI(); + this.bufferFactory = (NettyDataBufferFactory) response.bufferFactory(); + this.handler = handler; + } + + + @Override + public Observable handle(WebSocketConnection connection) { + Mono result = this.handler.handle(createSession(connection)); + return RxReactiveStreams.toObservable(result); + } + + private RxNettyWebSocketSession createSession(WebSocketConnection conn) { + return new RxNettyWebSocketSession(conn, this.uri, this.bufferFactory); + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java new file mode 100644 index 0000000000..455b5d8eb0 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java @@ -0,0 +1,143 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.adapter; + +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; +import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.reactivex.netty.protocol.http.ws.WebSocketConnection; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import rx.Observable; +import rx.RxReactiveStreams; + +import org.springframework.core.io.buffer.NettyDataBuffer; +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; +import org.springframework.web.reactive.socket.CloseStatus; +import org.springframework.web.reactive.socket.WebSocketMessage; + +/** + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public class RxNettyWebSocketSession extends WebSocketSessionSupport { + + private static final Map, WebSocketMessage.Type> MESSAGE_TYPES; + + static { + MESSAGE_TYPES = new HashMap<>(4); + MESSAGE_TYPES.put(TextWebSocketFrame.class, WebSocketMessage.Type.TEXT); + MESSAGE_TYPES.put(BinaryWebSocketFrame.class, WebSocketMessage.Type.BINARY); + MESSAGE_TYPES.put(PingWebSocketFrame.class, WebSocketMessage.Type.PING); + MESSAGE_TYPES.put(PongWebSocketFrame.class, WebSocketMessage.Type.PONG); + } + + + private final String id; + + private final URI uri; + + private final NettyDataBufferFactory bufferFactory; + + + public RxNettyWebSocketSession(WebSocketConnection conn, URI uri, NettyDataBufferFactory factory) { + super(conn); + Assert.notNull(uri, "'uri' is required."); + Assert.notNull(uri, "'bufferFactory' is required."); + this.id = ObjectUtils.getIdentityHexString(getDelegate()); + this.uri = uri; + this.bufferFactory = factory; + } + + + @Override + public String getId() { + return this.id; + } + + @Override + public URI getUri() { + return this.uri; + } + + @Override + public Flux receive() { + return Flux.from(RxReactiveStreams.toPublisher(getDelegate().getInput())) + .filter(frame -> !(frame instanceof CloseWebSocketFrame)) + .window() + .concatMap(flux -> flux.takeUntil(WebSocketFrame::isFinalFragment).buffer()) + .map(this::toMessage); + } + + @SuppressWarnings("OptionalGetWithoutIsPresent") + private WebSocketMessage toMessage(List frames) { + Class frameType = frames.get(0).getClass(); + if (frames.size() == 1) { + NettyDataBuffer buffer = this.bufferFactory.wrap(frames.get(0).content()); + return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer); + } + return frames.stream() + .map(socketFrame -> bufferFactory.wrap(socketFrame.content())) + .reduce(NettyDataBuffer::write) + .map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer)) + .get(); + } + + @Override + public Mono send(Publisher messages) { + Observable frames = RxReactiveStreams.toObservable(messages).map(this::toFrame); + Observable completion = getDelegate().write(frames); + return Mono.from(RxReactiveStreams.toPublisher(completion)); + } + + private WebSocketFrame toFrame(WebSocketMessage message) { + ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(message.getPayload()); + if (WebSocketMessage.Type.TEXT.equals(message.getType())) { + return new TextWebSocketFrame(byteBuf); + } + else if (WebSocketMessage.Type.BINARY.equals(message.getType())) { + return new BinaryWebSocketFrame(byteBuf); + } + else if (WebSocketMessage.Type.PING.equals(message.getType())) { + return new PingWebSocketFrame(byteBuf); + } + else if (WebSocketMessage.Type.BINARY.equals(message.getType())) { + return new PongWebSocketFrame(byteBuf); + } + else { + throw new IllegalArgumentException("Unexpected message type: " + message.getType()); + } + } + + @Override + protected Mono closeInternal(CloseStatus status) { + return Mono.from(RxReactiveStreams.toPublisher(getDelegate().close())); + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java new file mode 100644 index 0000000000..0e5ec0daf7 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java @@ -0,0 +1,77 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.adapter; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.publisher.Mono; + +import org.springframework.util.Assert; +import org.springframework.web.reactive.socket.CloseStatus; +import org.springframework.web.reactive.socket.WebSocketSession; + +/** + * Base class for {@link WebSocketSession} implementations wrapping and + * delegating to the native WebSocket session (or connection) of the underlying + * WebSocket runtime. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public abstract class WebSocketSessionSupport implements WebSocketSession { + + protected final Log logger = LogFactory.getLog(getClass()); + + + private final T delegate; + + + /** + * Create a new instance and associate the given attributes with it. + * @param delegate the underlying WebSocket connection + */ + protected WebSocketSessionSupport(T delegate) { + Assert.notNull(delegate, "'delegate' session is required."); + this.delegate = delegate; + } + + + /** + * Return the native session of the underlying runtime. + */ + public T getDelegate() { + return this.delegate; + } + + + @Override + public final Mono close(CloseStatus status) { + if (logger.isDebugEnabled()) { + logger.debug("Closing " + this); + } + return closeInternal(status); + } + + protected abstract Mono closeInternal(CloseStatus status); + + + @Override + public String toString() { + return getClass().getSimpleName() + "[id=" + getId() + ", uri=" + getUri() + "]"; + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/package-info.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/package-info.java new file mode 100644 index 0000000000..aa675f1b3b --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/package-info.java @@ -0,0 +1,4 @@ +/** + * Classes adapting Spring's Reactive WebSocket API to and from WebSocket runtimes. + */ +package org.springframework.web.reactive.socket.adapter; diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/package-info.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/package-info.java new file mode 100644 index 0000000000..38e4baf22b --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/package-info.java @@ -0,0 +1,4 @@ +/** + * Abstractions and support classes for WebSocket interactions. + */ +package org.springframework.web.reactive.socket; diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/RequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/RequestUpgradeStrategy.java new file mode 100644 index 0000000000..61c819eafa --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/RequestUpgradeStrategy.java @@ -0,0 +1,51 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.server; + +import java.util.Map; + +import reactor.core.publisher.Mono; + +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.server.ServerWebExchange; + +/** + * A strategy for upgrading an HTTP request to a WebSocket interaction depending + * on the underlying HTTP runtime. + * + *

Typically there is one such strategy for every {@link ServerHttpRequest} + * and {@link ServerHttpResponse} implementation type except in the case of + * Servlet containers for which there is no standard API to upgrade a request. + * JSR-356 does have programmatic endpoint registration but that is only + * intended for use on startup and not per request. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public interface RequestUpgradeStrategy { + + /** + * Upgrade the request to a WebSocket interaction and adapt the given + * Spring {@link WebSocketHandler} to the underlying runtime WebSocket API. + * @param exchange the current exchange + * @param webSocketHandler handler for WebSocket session + * @return a completion Mono for the WebSocket session handling + */ + Mono upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler); + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/WebSocketService.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/WebSocketService.java new file mode 100644 index 0000000000..27d0f3cf9d --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/WebSocketService.java @@ -0,0 +1,45 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.server; + +import reactor.core.publisher.Mono; + +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService; +import org.springframework.web.server.ServerWebExchange; + +/** + * A service to delegate WebSocket-related HTTP requests to. + * + *

For a straight-up WebSocket endpoint this means handling the initial + * handshake request but for a SockJS endpoint this means handling all HTTP + * requests defined in the SockJS protocol. + * + * @author Rossen Stoyanchev + * @since 5.0 + * @see HandshakeWebSocketService + */ +public interface WebSocketService { + + /** + * Handle the HTTP request and use the given {@link WebSocketHandler}. + * @param exchange the current exchange + * @param webSocketHandler handler for WebSocket session + * @return a completion Mono for the WebSocket session handling + */ + Mono handleRequest(ServerWebExchange exchange, WebSocketHandler webSocketHandler); + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/package-info.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/package-info.java new file mode 100644 index 0000000000..0c0cb9c23c --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/package-info.java @@ -0,0 +1,4 @@ +/** + * Server support for WebSocket interactions. + */ +package org.springframework.web.reactive.socket.server; diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java new file mode 100644 index 0000000000..b96d813963 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java @@ -0,0 +1,155 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.server.support; + +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.publisher.Mono; + +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; +import org.springframework.util.ReflectionUtils; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; +import org.springframework.web.reactive.socket.server.WebSocketService; +import org.springframework.web.server.MethodNotAllowedException; +import org.springframework.web.server.ServerWebExchange; + +/** + * A {@code WebSocketService} implementation that handles a WebSocket handshake + * and upgrades to a WebSocket interaction through the configured or + * auto-detected {@link RequestUpgradeStrategy}. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public class HandshakeWebSocketService implements WebSocketService { + + private static final String SEC_WEBSOCKET_KEY = "Sec-WebSocket-Key"; + + + private static final boolean rxNettyPresent = ClassUtils.isPresent( + "io.reactivex.netty.protocol.http.ws.WebSocketConnection", + HandshakeWebSocketService.class.getClassLoader()); + + + protected static final Log logger = LogFactory.getLog(HandshakeWebSocketService.class); + + + private final RequestUpgradeStrategy upgradeStrategy; + + + /** + * Default constructor automatic, classpath detection based discovery of the + * {@link RequestUpgradeStrategy} to use. + */ + public HandshakeWebSocketService() { + this(initUpgradeStrategy()); + } + + /** + * Alternative constructor with the {@link RequestUpgradeStrategy} to use. + * @param upgradeStrategy the strategy to use + */ + public HandshakeWebSocketService(RequestUpgradeStrategy upgradeStrategy) { + Assert.notNull(upgradeStrategy, "'upgradeStrategy' is required"); + this.upgradeStrategy = upgradeStrategy; + } + + private static RequestUpgradeStrategy initUpgradeStrategy() { + String className; + if (rxNettyPresent) { + className = "RxNettyRequestUpgradeStrategy"; + } + else { + throw new IllegalStateException("No suitable default RequestUpgradeStrategy found"); + } + + try { + className = HandshakeWebSocketService.class.getPackage().getName() + "." + className; + Class clazz = ClassUtils.forName(className, HandshakeWebSocketService.class.getClassLoader()); + return (RequestUpgradeStrategy) ReflectionUtils.accessibleConstructor(clazz).newInstance(); + } + catch (Throwable ex) { + throw new IllegalStateException( + "Failed to instantiate RequestUpgradeStrategy: " + className, ex); + } + } + + + /** + * Return the {@link RequestUpgradeStrategy} for WebSocket requests. + */ + public RequestUpgradeStrategy getUpgradeStrategy() { + return this.upgradeStrategy; + } + + + @Override + public Mono handleRequest(ServerWebExchange exchange, WebSocketHandler webSocketHandler) { + + ServerHttpRequest request = exchange.getRequest(); + ServerHttpResponse response = exchange.getResponse(); + + if (logger.isTraceEnabled()) { + logger.trace("Processing " + request.getMethod() + " " + request.getURI()); + } + + if (HttpMethod.GET != request.getMethod()) { + return Mono.error(new MethodNotAllowedException( + request.getMethod().name(), Collections.singleton("GET"))); + } + + if (!isWebSocketUpgrade(request)) { + response.setStatusCode(HttpStatus.BAD_REQUEST); + return response.setComplete(); + } + + return getUpgradeStrategy().upgrade(exchange, webSocketHandler); + } + + private boolean isWebSocketUpgrade(ServerHttpRequest request) { + if (!"WebSocket".equalsIgnoreCase(request.getHeaders().getUpgrade())) { + if (logger.isErrorEnabled()) { + logger.error("Invalid 'Upgrade' header: " + request.getHeaders()); + } + return false; + } + List connectionValue = request.getHeaders().getConnection(); + if (!connectionValue.contains("Upgrade") && !connectionValue.contains("upgrade")) { + if (logger.isErrorEnabled()) { + logger.error("Invalid 'Connection' header: " + request.getHeaders()); + } + return false; + } + String key = request.getHeaders().getFirst(SEC_WEBSOCKET_KEY); + if (key == null) { + if (logger.isErrorEnabled()) { + logger.error("Missing \"Sec-WebSocket-Key\" header"); + } + return false; + } + return true; + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/WebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/WebSocketHandlerAdapter.java new file mode 100644 index 0000000000..81f5c7a97f --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/WebSocketHandlerAdapter.java @@ -0,0 +1,75 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.server.support; + +import reactor.core.publisher.Mono; + +import org.springframework.util.Assert; +import org.springframework.web.reactive.DispatcherHandler; +import org.springframework.web.reactive.HandlerAdapter; +import org.springframework.web.reactive.HandlerResult; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.server.WebSocketService; +import org.springframework.web.server.ServerWebExchange; + +/** + * {@code HandlerAdapter} that allows using a {@link WebSocketHandler} contract + * with the generic {@link DispatcherHandler} mapping URLs directly to such + * handlers. Requests are handled through the configured {@link WebSocketService}. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public class WebSocketHandlerAdapter implements HandlerAdapter { + + private final WebSocketService webSocketService; + + + /** + * Default constructor that creates and uses a + * {@link HandshakeWebSocketService} for a straight-up WebSocket interaction, + * i.e. treating incoming requests as WebSocket handshake requests. + */ + public WebSocketHandlerAdapter() { + this(new HandshakeWebSocketService()); + } + + /** + * Alternative constructor with the {@link WebSocketService} to use. + */ + public WebSocketHandlerAdapter(WebSocketService webSocketService) { + Assert.notNull(webSocketService, "'webSocketService' is required"); + this.webSocketService = webSocketService; + } + + + public WebSocketService getWebSocketService() { + return this.webSocketService; + } + + + @Override + public boolean supports(Object handler) { + return WebSocketHandler.class.isAssignableFrom(handler.getClass()); + } + + @Override + public Mono handle(ServerWebExchange exchange, Object handler) { + WebSocketHandler webSocketHandler = (WebSocketHandler) handler; + return getWebSocketService().handleRequest(exchange, webSocketHandler).then(Mono.empty()); + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/package-info.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/package-info.java new file mode 100644 index 0000000000..4f4a56ea33 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/package-info.java @@ -0,0 +1,4 @@ +/** + * Server-side support classes for WebSocket requests. + */ +package org.springframework.web.reactive.socket.server.support; diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/RxNettyRequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/RxNettyRequestUpgradeStrategy.java new file mode 100644 index 0000000000..b923f38166 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/RxNettyRequestUpgradeStrategy.java @@ -0,0 +1,61 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.server.upgrade; + +import java.util.List; +import java.util.Map; + +import reactor.core.publisher.Mono; +import rx.Observable; +import rx.RxReactiveStreams; + +import org.springframework.http.server.reactive.RxNettyServerHttpRequest; +import org.springframework.http.server.reactive.RxNettyServerHttpResponse; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.adapter.RxNettyWebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; +import org.springframework.web.server.ServerWebExchange; + +/** + * A {@link RequestUpgradeStrategy} for use with RxNetty. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public class RxNettyRequestUpgradeStrategy implements RequestUpgradeStrategy { + + @Override + public Mono upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler) { + + RxNettyServerHttpRequest request = (RxNettyServerHttpRequest) exchange.getRequest(); + RxNettyServerHttpResponse response = (RxNettyServerHttpResponse) exchange.getResponse(); + + RxNettyWebSocketHandlerAdapter rxNettyHandler = + new RxNettyWebSocketHandlerAdapter(request, response, webSocketHandler); + + Observable completion = response.getRxNettyResponse() + .acceptWebSocketUpgrade(rxNettyHandler) + .subprotocol(getSubProtocols(webSocketHandler)); + + return Mono.from(RxReactiveStreams.toPublisher(completion)); + } + + private static String[] getSubProtocols(WebSocketHandler webSocketHandler) { + List subProtocols = webSocketHandler.getSubProtocols(); + return subProtocols.toArray(new String[subProtocols.size()]); + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/package-info.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/package-info.java new file mode 100644 index 0000000000..6fdce1fc11 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/package-info.java @@ -0,0 +1,5 @@ +/** + * Holds implementations of + * {@link org.springframework.web.reactive.socket.server.RequestUpgradeStrategy}. + */ +package org.springframework.web.reactive.socket.server.upgrade; diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java new file mode 100644 index 0000000000..ec73d6a331 --- /dev/null +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java @@ -0,0 +1,122 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.server; + +import org.junit.After; +import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.server.reactive.HttpHandler; +import org.springframework.http.server.reactive.bootstrap.HttpServer; +import org.springframework.http.server.reactive.bootstrap.RxNettyHttpServer; +import org.springframework.util.SocketUtils; +import org.springframework.web.reactive.DispatcherHandler; +import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService; +import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.server.upgrade.RxNettyRequestUpgradeStrategy; + +/** + * Base class for WebSocket integration tests involving a server-side + * {@code WebSocketHandler}. Sub-classes to return a Spring configuration class + * via {@link #getWebConfigClass()} containing a SimpleUrlHandlerMapping with + * pattern-to-WebSocketHandler mappings. + * + * @author Rossen Stoyanchev + */ +@RunWith(Parameterized.class) +@SuppressWarnings({"unused", "WeakerAccess"}) +public abstract class AbstractWebSocketHandlerIntegrationTests { + + protected int port; + + @Parameter(0) + public HttpServer server; + + @Parameter(1) + public Class handlerAdapterConfigClass; + + + @Parameters + public static Object[][] arguments() { + return new Object[][] { + {new RxNettyHttpServer(), RxNettyConfig.class} + }; + } + + + @Before + public void setup() throws Exception { + this.port = SocketUtils.findAvailableTcpPort(); + this.server.setPort(this.port); + this.server.setHandler(createHttpHandler()); + this.server.afterPropertiesSet(); + this.server.start(); + } + + private HttpHandler createHttpHandler() { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(DispatcherConfig.class, this.handlerAdapterConfigClass); + context.register(getWebConfigClass()); + context.refresh(); + return DispatcherHandler.toHttpHandler(context); + } + + protected abstract Class getWebConfigClass(); + + @After + public void tearDown() throws Exception { + this.server.stop(); + } + + + @Configuration + static class DispatcherConfig { + + @Bean + public DispatcherHandler webHandler() { + return new DispatcherHandler(); + } + } + + static abstract class AbstractHandlerAdapterConfig { + + @Bean + public WebSocketHandlerAdapter handlerAdapter() { + RequestUpgradeStrategy strategy = createUpgradeStrategy(); + WebSocketService service = new HandshakeWebSocketService(strategy); + return new WebSocketHandlerAdapter(service); + } + + protected abstract RequestUpgradeStrategy createUpgradeStrategy(); + + } + + @Configuration + static class RxNettyConfig extends AbstractHandlerAdapterConfig { + + @Override + protected RequestUpgradeStrategy createUpgradeStrategy() { + return new RxNettyRequestUpgradeStrategy(); + } + } + +} diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/BasicWebSocketHandlerIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/BasicWebSocketHandlerIntegrationTests.java new file mode 100644 index 0000000000..cd2dd26b00 --- /dev/null +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/BasicWebSocketHandlerIntegrationTests.java @@ -0,0 +1,101 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.server; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.reactivex.netty.protocol.http.client.HttpClient; +import io.reactivex.netty.protocol.http.ws.client.WebSocketResponse; +import org.junit.Test; +import reactor.core.publisher.Mono; +import rx.Observable; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.reactive.HandlerMapping; +import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.WebSocketSession; + +import static org.junit.Assert.assertEquals; + +/** + * Basic WebSocket integration + * @author Rossen Stoyanchev + */ +@SuppressWarnings({"unused", "WeakerAccess"}) +public class BasicWebSocketHandlerIntegrationTests extends AbstractWebSocketHandlerIntegrationTests { + + + @Override + protected Class getWebConfigClass() { + return WebConfig.class; + } + + + @Test + public void echo() throws Exception { + Observable messages = Observable.range(1, 10).map(i -> "Interval " + i); + List actual = HttpClient.newClient("localhost", this.port) + .createGet("/echo") + .requestWebSocketUpgrade() + .flatMap(WebSocketResponse::getWebSocketConnection) + .flatMap(conn -> conn.write(messages + .map(TextWebSocketFrame::new) + .cast(WebSocketFrame.class) + .concatWith(Observable.just(new CloseWebSocketFrame()))) + .cast(WebSocketFrame.class) + .mergeWith(conn.getInput()) + ) + .take(10) + .map(frame -> frame.content().toString(StandardCharsets.UTF_8)) + .toList().toBlocking().first(); + List expected = messages.toList().toBlocking().first(); + assertEquals(expected, actual); + } + + + @Configuration + static class WebConfig { + + @Bean + public HandlerMapping handlerMapping() { + + Map map = new HashMap<>(); + map.put("/echo", new EchoWebSocketHandler()); + + SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping(); + mapping.setUrlMap(map); + return mapping; + } + + } + + private static class EchoWebSocketHandler implements WebSocketHandler { + + @Override + public Mono handle(WebSocketSession session) { + return session.send(session.receive()); + } + } + +} -- GitLab