From 725d685799f577b2d92e71a344c502f485a392a9 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 1 May 2018 12:03:24 -0400 Subject: [PATCH] ReactorNettyWebSocketSession implements close properly Issue: SPR-16774 --- .../adapter/ReactorNettyWebSocketSession.java | 13 +++++----- .../socket/WebSocketIntegrationTests.java | 26 ++++++++++++++++++- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java index b769e983f5..a5cd143b7f 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,10 +15,12 @@ */ package org.springframework.web.reactive.socket.adapter; +import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyOutbound; import reactor.ipc.netty.NettyPipeline; @@ -42,6 +44,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; public class ReactorNettyWebSocketSession extends NettyWebSocketSessionSupport { + private final MonoProcessor closeMono = MonoProcessor.create(); + public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound, HandshakeInfo info, NettyDataBufferFactory bufferFactory) { @@ -69,11 +73,8 @@ public class ReactorNettyWebSocketSession @Override public Mono close(CloseStatus status) { - return Mono.error(new UnsupportedOperationException( - "Reactor Netty does not support closing the session from anywhere. " + - "You will need to work with the Flux returned from receive() method, " + - "either subscribing to it and using the returned Disposable, " + - "or using an operator that cancels (e.g. take).")); + WebSocketFrame closeFrame = new CloseWebSocketFrame(status.getCode(), status.getReason()); + return getDelegate().getOutbound().sendObject(closeFrame).then(); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java index 9ef277f2c3..b53c4536ac 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -127,6 +127,21 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests assertEquals("my-header:my-value", output.block(Duration.ofMillis(5000))); } + @Test + public void sessionClosing() throws Exception { + this.client.execute(getUrl("/close"), + session -> { + logger.debug("Starting.."); + return session.receive() + .doOnNext(s -> logger.debug("inbound " + s)) + .then() + .doFinally(signalType -> { + logger.debug("Completed with: " + signalType); + }); + }) + .block(Duration.ofMillis(5000)); + } + @Configuration static class WebConfig { @@ -137,6 +152,7 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests map.put("/echo", new EchoWebSocketHandler()); map.put("/sub-protocol", new SubProtocolWebSocketHandler()); map.put("/custom-header", new CustomHeaderHandler()); + map.put("/close", new SessionClosingHandler()); SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping(); mapping.setUrlMap(map); @@ -183,4 +199,12 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests } } + private static class SessionClosingHandler implements WebSocketHandler { + + @Override + public Mono handle(WebSocketSession session) { + return Flux.never().mergeWith(session.close(CloseStatus.GOING_AWAY)).then(); + } + } + } -- GitLab