提交 725d6857 编写于 作者: R Rossen Stoyanchev

ReactorNettyWebSocketSession implements close properly

Issue: SPR-16774
上级 6545cab4
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -15,10 +15,12 @@
*/
package org.springframework.web.reactive.socket.adapter;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.NettyPipeline;
......@@ -42,6 +44,8 @@ import org.springframework.web.reactive.socket.WebSocketSession;
public class ReactorNettyWebSocketSession
extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> {
private final MonoProcessor<WebSocketFrame> closeMono = MonoProcessor.create();
public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
HandshakeInfo info, NettyDataBufferFactory bufferFactory) {
......@@ -69,11 +73,8 @@ public class ReactorNettyWebSocketSession
@Override
public Mono<Void> close(CloseStatus status) {
return Mono.error(new UnsupportedOperationException(
"Reactor Netty does not support closing the session from anywhere. " +
"You will need to work with the Flux returned from receive() method, " +
"either subscribing to it and using the returned Disposable, " +
"or using an operator that cancels (e.g. take)."));
WebSocketFrame closeFrame = new CloseWebSocketFrame(status.getCode(), status.getReason());
return getDelegate().getOutbound().sendObject(closeFrame).then();
}
......
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -127,6 +127,21 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
assertEquals("my-header:my-value", output.block(Duration.ofMillis(5000)));
}
@Test
public void sessionClosing() throws Exception {
this.client.execute(getUrl("/close"),
session -> {
logger.debug("Starting..");
return session.receive()
.doOnNext(s -> logger.debug("inbound " + s))
.then()
.doFinally(signalType -> {
logger.debug("Completed with: " + signalType);
});
})
.block(Duration.ofMillis(5000));
}
@Configuration
static class WebConfig {
......@@ -137,6 +152,7 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
map.put("/echo", new EchoWebSocketHandler());
map.put("/sub-protocol", new SubProtocolWebSocketHandler());
map.put("/custom-header", new CustomHeaderHandler());
map.put("/close", new SessionClosingHandler());
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
......@@ -183,4 +199,12 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
}
}
private static class SessionClosingHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return Flux.never().mergeWith(session.close(CloseStatus.GOING_AWAY)).then();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册