提交 a2053a51 编写于 作者: V Violeta Georgieva 提交者: Rossen Stoyanchev

Initial reactive, WebSocket Jetty support

Issue: SPR-14527
上级 80040ef4
......@@ -829,6 +829,9 @@ project("spring-web-reactive") {
exclude group: "org.apache.tomcat", module: "tomcat-websocket-api"
exclude group: "org.apache.tomcat", module: "tomcat-servlet-api"
}
optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") {
exclude group: "javax.servlet", module: "javax.servlet"
}
optional("io.undertow:undertow-websockets-jsr:${undertowVersion}") {
exclude group: "org.jboss.spec.javax.websocket", module: "jboss-websocket-api_1.1_spec"
}
......
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.OpCode;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketMessage.Type;
/**
* Jetty {@code WebSocketHandler} implementation adapting and
* delegating to a Spring {@link WebSocketHandler}.
*
* @author Violeta Georgieva
* @since 5.0
*/
@WebSocket
public class JettyWebSocketHandlerAdapter {
private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer.wrap(new byte[0]);
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(false);
private final WebSocketHandler handler;
private JettyWebSocketSession wsSession;
public JettyWebSocketHandlerAdapter(WebSocketHandler handler) {
Assert.notNull("'handler' is required");
this.handler = handler;
}
@OnWebSocketConnect
public void onWebSocketConnect(Session session) {
this.wsSession = new JettyWebSocketSession(session);
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber();
this.handler.handle(this.wsSession).subscribe(resultSubscriber);
}
@OnWebSocketMessage
public void onWebSocketText(String message) {
if (this.wsSession != null) {
WebSocketMessage wsMessage = toMessage(Type.TEXT, message);
this.wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
}
@OnWebSocketMessage
public void onWebSocketBinary(byte[] message, int offset, int length) {
if (this.wsSession != null) {
WebSocketMessage wsMessage = toMessage(Type.BINARY, ByteBuffer.wrap(message, offset, length));
wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
}
@OnWebSocketFrame
public void onWebSocketFrame(Frame frame) {
if (this.wsSession != null) {
if (OpCode.PONG == frame.getOpCode()) {
ByteBuffer message = frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD;
WebSocketMessage wsMessage = toMessage(Type.PONG, message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
}
}
@OnWebSocketClose
public void onWebSocketClose(int statusCode, String reason) {
if (this.wsSession != null) {
this.wsSession.handleClose(new CloseStatus(statusCode, reason));
}
}
@OnWebSocketError
public void onWebSocketError(Throwable cause) {
if (this.wsSession != null) {
this.wsSession.handleError(cause);
}
}
private <T> WebSocketMessage toMessage(Type type, T message) {
if (Type.TEXT.equals(type)) {
return WebSocketMessage.create(Type.TEXT,
bufferFactory.wrap(((String) message).getBytes(StandardCharsets.UTF_8)));
}
else if (Type.BINARY.equals(type)) {
return WebSocketMessage.create(Type.BINARY,
bufferFactory.wrap((ByteBuffer) message));
}
else if (Type.PONG.equals(type)) {
return WebSocketMessage.create(Type.PONG,
bufferFactory.wrap((ByteBuffer) message));
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message);
}
}
private final class HandlerResultSubscriber implements Subscriber<Void> {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Void aVoid) {
// no op
}
@Override
public void onError(Throwable ex) {
if (wsSession != null) {
wsSession.close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage()));
}
}
@Override
public void onComplete() {
if (wsSession != null) {
wsSession.close();
}
}
}
}
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Mono;
/**
* Spring {@link WebSocketSession} adapter for Jetty's
* {@link org.eclipse.jetty.websocket.api.Session}.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class JettyWebSocketSession extends AbstractListenerWebSocketSessionSupport<Session> {
public JettyWebSocketSession(Session session) {
super(session, ObjectUtils.getIdentityHexString(session),
session.getUpgradeRequest().getRequestURI());
}
@Override
protected Mono<Void> closeInternal(CloseStatus status) {
getDelegate().close(status.getCode(), status.getReason());
return Mono.empty();
}
@Override
protected boolean writeInternal(WebSocketMessage message) throws IOException {
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
this.webSocketMessageProcessor.setReady(false);
getDelegate().getRemote().sendString(
new String(message.getPayload().asByteBuffer().array(), StandardCharsets.UTF_8),
new WebSocketMessageWriteCallback());
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
this.webSocketMessageProcessor.setReady(false);
getDelegate().getRemote().sendBytes(message.getPayload().asByteBuffer(),
new WebSocketMessageWriteCallback());
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
getDelegate().getRemote().sendPing(message.getPayload().asByteBuffer());
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
getDelegate().getRemote().sendPong(message.getPayload().asByteBuffer());
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
return true;
}
private final class WebSocketMessageWriteCallback implements WriteCallback {
@Override
public void writeFailed(Throwable x) {
webSocketMessageProcessor.cancel();
webSocketMessageProcessor.onError(x);
}
@Override
public void writeSuccess() {
webSocketMessageProcessor.setReady(true);
webSocketMessageProcessor.onWritePossible();
}
}
}
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server.upgrade;
import java.io.IOException;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.springframework.context.Lifecycle;
import org.springframework.core.NamedThreadLocal;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServletServerHttpRequest;
import org.springframework.http.server.reactive.ServletServerHttpResponse;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
/**
* A {@link RequestUpgradeStrategy} for use with Jetty.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Lifecycle {
private static final ThreadLocal<JettyWebSocketHandlerAdapter> wsContainerHolder =
new NamedThreadLocal<>("Jetty WebSocketHandler Adapter");
private WebSocketServerFactory factory;
private ServletContext servletContext;
private volatile boolean running = false;
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler) {
JettyWebSocketHandlerAdapter adapter =
new JettyWebSocketHandlerAdapter(webSocketHandler);
HttpServletRequest servletRequest = getHttpServletRequest(exchange.getRequest());
HttpServletResponse servletResponse = getHttpServletResponse(exchange.getResponse());
if (this.servletContext == null) {
this.servletContext = servletRequest.getServletContext();
servletContext.setAttribute(DecoratedObjectFactory.ATTR, new DecoratedObjectFactory());
}
try {
start();
Assert.isTrue(this.factory.isUpgradeRequest(servletRequest, servletResponse), "Not a WebSocket handshake");
wsContainerHolder.set(adapter);
this.factory.acceptWebSocket(servletRequest, servletResponse);
}
catch (IOException ex) {
return Mono.error(ex);
}
finally {
wsContainerHolder.remove();
}
return Mono.empty();
}
@Override
public void start() {
if (!isRunning() && this.servletContext != null) {
this.running = true;
try {
this.factory = new WebSocketServerFactory(this.servletContext);
this.factory.setCreator(new WebSocketCreator() {
@Override
public Object createWebSocket(ServletUpgradeRequest req,
ServletUpgradeResponse resp) {
JettyWebSocketHandlerAdapter adapter = wsContainerHolder.get();
Assert.state(adapter != null, "Expected JettyWebSocketHandlerAdapter");
return adapter;
}
});
this.factory.start();
}
catch (Exception ex) {
throw new IllegalStateException("Unable to start Jetty WebSocketServerFactory", ex);
}
}
}
@Override
public void stop() {
if (isRunning()) {
this.running = false;
try {
this.factory.stop();
}
catch (Exception ex) {
throw new IllegalStateException("Unable to stop Jetty WebSocketServerFactory", ex);
}
}
}
@Override
public boolean isRunning() {
return this.running;
}
private final HttpServletRequest getHttpServletRequest(ServerHttpRequest request) {
Assert.isTrue(request instanceof ServletServerHttpRequest);
return ((ServletServerHttpRequest) request).getServletRequest();
}
private final HttpServletResponse getHttpServletResponse(ServerHttpResponse response) {
Assert.isTrue(response instanceof ServletServerHttpResponse);
return ((ServletServerHttpResponse) response).getServletResponse();
}
}
......@@ -31,6 +31,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.bootstrap.HttpServer;
import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer;
import org.springframework.http.server.reactive.bootstrap.JettyHttpServer;
import org.springframework.http.server.reactive.bootstrap.RxNettyHttpServer;
import org.springframework.http.server.reactive.bootstrap.TomcatHttpServer;
import org.springframework.http.server.reactive.bootstrap.UndertowHttpServer;
......@@ -39,6 +40,7 @@ import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.JettyRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.RxNettyRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.TomcatRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.UndertowRequestUpgradeStrategy;
......@@ -71,7 +73,8 @@ public abstract class AbstractWebSocketHandlerIntegrationTests {
{new ReactorHttpServer(), ReactorNettyConfig.class},
{new RxNettyHttpServer(), RxNettyConfig.class},
{new TomcatHttpServer(base.getAbsolutePath(), WsContextListener.class), TomcatConfig.class},
{new UndertowHttpServer(), UndertowConfig.class}
{new UndertowHttpServer(), UndertowConfig.class},
{new JettyHttpServer(), JettyConfig.class}
};
}
......@@ -162,4 +165,13 @@ public abstract class AbstractWebSocketHandlerIntegrationTests {
}
}
@Configuration
static class JettyConfig extends AbstractHandlerAdapterConfig {
@Override
protected RequestUpgradeStrategy getUpgradeStrategy() {
return new JettyRequestUpgradeStrategy();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册