提交 8d828dd1 编写于 作者: B Brian Clozel

Update websocket support for Jetty 9.3+

Due to a few changes in `WebSocketServerFactory` and `Session` API, our
`JettyRequestUpgradeStrategy` and `JettyWebSocketSession` needed to
adapt. As of 9.3.15+ and 9.4.0+, some reflection is required to support
previous versions.

Spring 5.0 supports Jetty 9.3 and 9.4.

Issue: SPR-14940
上级 cf6a5835
......@@ -59,7 +59,7 @@ configure(allprojects) { project ->
ext.jaxbVersion = "2.2.11"
ext.jaxwsVersion = "2.2.11"
ext.jcaVersion = "1.7"
ext.jettyVersion = "9.4.0.RC1" // RC2 has broken HEAD handling
ext.jettyVersion = "9.4.0.RC3"
ext.jmsVersion = "2.0.1"
ext.jodaVersion = "2.9.6"
ext.jpaVersion = "2.1.1"
......
......@@ -17,6 +17,7 @@
package org.springframework.web.socket.adapter.jetty;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.Principal;
......@@ -26,11 +27,14 @@ import java.util.Map;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.springframework.http.HttpHeaders;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.PingMessage;
......@@ -45,10 +49,22 @@ import org.springframework.web.socket.adapter.AbstractWebSocketSession;
*
* @author Phillip Webb
* @author Rossen Stoyanchev
* @author Brian Clozel
* @since 4.0
*/
public class JettyWebSocketSession extends AbstractWebSocketSession<Session> {
// As of Jetty 9.4, UpgradeRequest and UpgradeResponse are interfaces instead of classes
private static final boolean isJetty94;
private static Method getUpgradeRequest;
private static Method getUpgradeResponse;
private static Method getRequestURI;
private static Method getHeaders;
private static Method getAcceptedSubProtocol;
private static Method getExtensions;
private static Method getUserPrincipal;
private String id;
private URI uri;
......@@ -61,6 +77,23 @@ public class JettyWebSocketSession extends AbstractWebSocketSession<Session> {
private Principal user;
static {
isJetty94 = UpgradeRequest.class.isInterface();
if (!isJetty94) {
try {
getUpgradeRequest = Session.class.getMethod("getUpgradeRequest");
getUpgradeResponse = Session.class.getMethod("getUpgradeResponse");
getRequestURI = UpgradeRequest.class.getMethod("getRequestURI");
getHeaders = UpgradeRequest.class.getMethod("getHeaders");
getAcceptedSubProtocol = UpgradeResponse.class.getMethod("getAcceptedSubProtocol");
getExtensions = UpgradeResponse.class.getMethod("getExtensions");
getUserPrincipal = UpgradeRequest.class.getMethod("getUserPrincipal");
}
catch (NoSuchMethodException ex) {
throw new IllegalStateException("Incompatible Jetty API", ex);
}
}
}
/**
* Create a new {@link JettyWebSocketSession} instance.
......@@ -164,20 +197,64 @@ public class JettyWebSocketSession extends AbstractWebSocketSession<Session> {
@Override
public void initializeNativeSession(Session session) {
super.initializeNativeSession(session);
if (isJetty94) {
initializeJetty94Session(session);
}
else {
initializeJettySession(session);
}
}
@SuppressWarnings("unchecked")
private void initializeJettySession(Session session) {
Object request = ReflectionUtils.invokeMethod(getUpgradeRequest, session);
Object response = ReflectionUtils.invokeMethod(getUpgradeResponse, session);
this.id = ObjectUtils.getIdentityHexString(getNativeSession());
this.uri = (URI) ReflectionUtils.invokeMethod(getRequestURI, request);
this.headers = new HttpHeaders();
this.headers.putAll((Map<String, List<String>>) ReflectionUtils.invokeMethod(getHeaders, request));
this.headers = HttpHeaders.readOnlyHttpHeaders(headers);
this.acceptedProtocol = (String) ReflectionUtils.invokeMethod(getAcceptedSubProtocol, response);
List<ExtensionConfig> source = (List<ExtensionConfig>) ReflectionUtils.invokeMethod(getExtensions, response);
if (source != null) {
this.extensions = new ArrayList<>(source.size());
for (ExtensionConfig ec : source) {
this.extensions.add(new WebSocketExtension(ec.getName(), ec.getParameters()));
}
}
else {
this.extensions = new ArrayList<>(0);
}
if (this.user == null) {
this.user = (Principal) ReflectionUtils.invokeMethod(getUserPrincipal, request);
}
}
private void initializeJetty94Session(Session session) {
this.id = ObjectUtils.getIdentityHexString(getNativeSession());
this.uri = session.getUpgradeRequest().getRequestURI();
this.headers = new HttpHeaders();
this.headers.putAll(getNativeSession().getUpgradeRequest().getHeaders());
this.headers.putAll(session.getUpgradeRequest().getHeaders());
this.headers = HttpHeaders.readOnlyHttpHeaders(headers);
this.acceptedProtocol = session.getUpgradeResponse().getAcceptedSubProtocol();
List<ExtensionConfig> source = getNativeSession().getUpgradeResponse().getExtensions();
this.extensions = new ArrayList<>(source.size());
for (ExtensionConfig ec : source) {
this.extensions.add(new WebSocketExtension(ec.getName(), ec.getParameters()));
List<ExtensionConfig> source = session.getUpgradeResponse().getExtensions();
if (source != null) {
this.extensions = new ArrayList<>(source.size());
for (ExtensionConfig ec : source) {
this.extensions.add(new WebSocketExtension(ec.getName(), ec.getParameters()));
}
}
else {
this.extensions = new ArrayList<>(0);
}
if (this.user == null) {
......
......@@ -17,7 +17,6 @@
package org.springframework.web.socket.server.jetty;
import java.io.IOException;
import java.lang.reflect.Method;
import java.security.Principal;
import java.util.ArrayList;
import java.util.List;
......@@ -27,8 +26,7 @@ import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.server.HandshakeRFC6455;
......@@ -56,11 +54,12 @@ import org.springframework.web.socket.server.HandshakeFailureException;
import org.springframework.web.socket.server.RequestUpgradeStrategy;
/**
* A {@link RequestUpgradeStrategy} for use with Jetty 9.3 and higher. Based on
* A {@link RequestUpgradeStrategy} for use with Jetty 9.3 and 9.4. Based on
* Jetty's internal {@code org.eclipse.jetty.websocket.server.WebSocketHandler} class.
*
* @author Phillip Webb
* @author Rossen Stoyanchev
* @author Brian Clozel
* @since 4.0
*/
public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Lifecycle, ServletContextAware {
......@@ -68,53 +67,39 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life
private static final ThreadLocal<WebSocketHandlerContainer> wsContainerHolder =
new NamedThreadLocal<>("WebSocket Handler Container");
// Actually 9.3.15+
private static boolean isJetty94 = ClassUtils.hasConstructor(WebSocketServerFactory.class, ServletContext.class);
private final WebSocketServerFactory factory;
private WebSocketServerFactoryAdapter factoryAdapter;
private volatile List<WebSocketExtension> supportedExtensions;
private ServletContext servletContext;
protected ServletContext servletContext;
private volatile boolean running = false;
/**
* Default constructor that creates {@link WebSocketServerFactory} through
* its default constructor thus using a default {@link WebSocketPolicy}.
*/
public JettyRequestUpgradeStrategy() {
this(new WebSocketServerFactory());
this(WebSocketPolicy.newServerPolicy());
}
/**
* A constructor accepting a {@link WebSocketServerFactory}.
* This may be useful for modifying the factory's {@link WebSocketPolicy}
* via {@link WebSocketServerFactory#getPolicy()}.
* A constructor accepting a {@link WebSocketPolicy}
* to be used when creating the {@link WebSocketServerFactory} instance.
* @since 4.3
*/
public JettyRequestUpgradeStrategy(WebSocketServerFactory factory) {
Assert.notNull(factory, "WebSocketServerFactory must not be null");
this.factory = factory;
this.factory.setCreator(new WebSocketCreator() {
@Override
public Object createWebSocket(ServletUpgradeRequest request, ServletUpgradeResponse response) {
// Cast to avoid infinite recursion
return createWebSocket((UpgradeRequest) request, (UpgradeResponse) response);
}
// For Jetty 9.0.x
public Object createWebSocket(UpgradeRequest request, UpgradeResponse response) {
WebSocketHandlerContainer container = wsContainerHolder.get();
Assert.state(container != null, "Expected WebSocketHandlerContainer");
response.setAcceptedSubProtocol(container.getSelectedProtocol());
response.setExtensions(container.getExtensionConfigs());
return container.getHandler();
}
});
public JettyRequestUpgradeStrategy(WebSocketPolicy webSocketPolicy) {
this.factoryAdapter = isJetty94 ? new Jetty94WebSocketServerFactoryAdapter()
: new JettyWebSocketServerFactoryAdapter();
this.factoryAdapter.setWebSocketPolicy(webSocketPolicy);
}
@Override
public String[] getSupportedVersions() {
return new String[] { String.valueOf(HandshakeRFC6455.VERSION) };
return new String[] {String.valueOf(HandshakeRFC6455.VERSION)};
}
@Override
......@@ -127,7 +112,7 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life
private List<WebSocketExtension> getWebSocketExtensions() {
List<WebSocketExtension> result = new ArrayList<>();
for (String name : this.factory.getExtensionFactory().getExtensionNames()) {
for (String name : this.factoryAdapter.getFactory().getExtensionFactory().getExtensionNames()) {
result.add(new WebSocketExtension(name));
}
return result;
......@@ -149,10 +134,10 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life
if (!isRunning()) {
this.running = true;
try {
this.factory.init(this.servletContext);
this.factoryAdapter.start();
}
catch (Exception ex) {
throw new IllegalStateException("Unable to initialize Jetty WebSocketServerFactory", ex);
throw new IllegalStateException("Unable to start Jetty WebSocketServerFactory", ex);
}
}
}
......@@ -160,8 +145,13 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life
@Override
public void stop() {
if (isRunning()) {
this.running = false;
this.factory.cleanup();
try {
this.running = false;
this.factoryAdapter.stop();
}
catch (Exception ex) {
throw new IllegalStateException("Unable to stop Jetty WebSocketServerFactory", ex);
}
}
}
......@@ -176,7 +166,8 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life
Assert.isInstanceOf(ServletServerHttpResponse.class, response);
HttpServletResponse servletResponse = ((ServletServerHttpResponse) response).getServletResponse();
Assert.isTrue(this.factory.isUpgradeRequest(servletRequest, servletResponse), "Not a WebSocket handshake");
Assert.isTrue(this.factoryAdapter.getFactory()
.isUpgradeRequest(servletRequest, servletResponse), "Not a WebSocket handshake");
JettyWebSocketSession session = new JettyWebSocketSession(attributes, user);
JettyWebSocketHandlerAdapter handlerAdapter = new JettyWebSocketHandlerAdapter(wsHandler, session);
......@@ -186,7 +177,7 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life
try {
wsContainerHolder.set(container);
this.factory.acceptWebSocket(servletRequest, servletResponse);
this.factoryAdapter.getFactory().acceptWebSocket(servletRequest, servletResponse);
}
catch (IOException ex) {
throw new HandshakeFailureException(
......@@ -210,7 +201,7 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life
this.handler = handler;
this.selectedProtocol = protocol;
if (CollectionUtils.isEmpty(extensions)) {
this.extensionConfigs = null;
this.extensionConfigs = new ArrayList<>();
}
else {
this.extensionConfigs = new ArrayList<>();
......@@ -233,4 +224,69 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life
}
}
private static abstract class WebSocketServerFactoryAdapter {
protected WebSocketServerFactory factory;
protected WebSocketPolicy webSocketPolicy;
public WebSocketServerFactory getFactory() {
return factory;
}
public void setWebSocketPolicy(WebSocketPolicy webSocketPolicy) {
this.webSocketPolicy = webSocketPolicy;
}
protected void configureFactory() {
this.factory.setCreator(new WebSocketCreator() {
@Override
public Object createWebSocket(ServletUpgradeRequest request, ServletUpgradeResponse response) {
WebSocketHandlerContainer container = wsContainerHolder.get();
Assert.state(container != null, "Expected WebSocketHandlerContainer");
response.setAcceptedSubProtocol(container.getSelectedProtocol());
response.setExtensions(container.getExtensionConfigs());
return container.getHandler();
}
});
}
abstract void start() throws Exception;
abstract void stop() throws Exception;
}
private class JettyWebSocketServerFactoryAdapter extends WebSocketServerFactoryAdapter {
@Override
void start() throws Exception {
this.factory = WebSocketServerFactory.class.getConstructor(WebSocketPolicy.class)
.newInstance(this.webSocketPolicy);
configureFactory();
WebSocketServerFactory.class.getMethod("init", ServletContext.class)
.invoke(this.factory, servletContext);
}
@Override
void stop() throws Exception {
WebSocketServerFactory.class.getMethod("cleanup").invoke(this.factory);
}
}
private class Jetty94WebSocketServerFactoryAdapter extends WebSocketServerFactoryAdapter {
@Override
void start() throws Exception {
servletContext.setAttribute(DecoratedObjectFactory.ATTR, new DecoratedObjectFactory());
this.factory = new WebSocketServerFactory(servletContext, this.webSocketPolicy);
configureFactory();
this.factory.start();
}
@Override
void stop() throws Exception {
this.factory.stop();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册