提交 140ff7ce 编写于 作者: R Rossen Stoyanchev

Polish reactive WebSocket integration tests

上级 4c005e63
......@@ -46,16 +46,15 @@ import org.springframework.web.reactive.socket.server.upgrade.TomcatRequestUpgra
import org.springframework.web.reactive.socket.server.upgrade.UndertowRequestUpgradeStrategy;
/**
* Base class for WebSocket integration tests involving a server-side
* {@code WebSocketHandler}. Sub-classes to return a Spring configuration class
* via {@link #getWebConfigClass()} containing a SimpleUrlHandlerMapping with
* pattern-to-WebSocketHandler mappings.
* Base class for WebSocket integration tests.
* Sub-classes must implement {@link #getWebConfigClass()} to return Spring
* config class with handler mappings to {@code WebSocketHandler}'s.
*
* @author Rossen Stoyanchev
*/
@RunWith(Parameterized.class)
@SuppressWarnings({"unused", "WeakerAccess"})
public abstract class AbstractWebSocketHandlerIntegrationTests {
public abstract class AbstractWebSocketIntegrationTests {
protected int port;
......
......@@ -17,7 +17,6 @@ package org.springframework.web.reactive.socket.server;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
......@@ -38,11 +37,12 @@ import org.springframework.web.reactive.socket.WebSocketSession;
import static org.junit.Assert.assertEquals;
/**
* Basic WebSocket integration tests.
* Integration tests with server-side {@link WebSocketHandler}s.
*
* @author Rossen Stoyanchev
*/
@SuppressWarnings({"unused", "WeakerAccess"})
public class BasicWebSocketHandlerIntegrationTests extends AbstractWebSocketHandlerIntegrationTests {
public class ServerWebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
@Override
......@@ -53,26 +53,22 @@ public class BasicWebSocketHandlerIntegrationTests extends AbstractWebSocketHand
@Test
public void echo() throws Exception {
Observable<String> messages = Observable.range(1, 10).map(i -> "Interval " + i);
List<String> actual = HttpClient.newClient("localhost", this.port)
int count = 100;
Observable<String> input = Observable.range(1, count).map(index -> "msg-" + index);
Observable<String> output = HttpClient.newClient("localhost", this.port)
.createGet("/echo")
.requestWebSocketUpgrade()
.flatMap(WebSocketResponse::getWebSocketConnection)
.flatMap(conn -> conn.write(messages
.map(TextWebSocketFrame::new)
.cast(WebSocketFrame.class))
.cast(WebSocketFrame.class)
.flatMap(conn -> conn
.write(input.map(TextWebSocketFrame::new)).cast(WebSocketFrame.class)
.mergeWith(conn.getInput())
)
.take(10)
.map(frame -> {
String text = frame.content().toString(StandardCharsets.UTF_8);
frame.release();
return text;
})
.toList().toBlocking().first();
List<String> expected = messages.toList().toBlocking().first();
assertEquals(expected, actual);
.take(count)
.map(frame -> {
String text = frame.content().toString(StandardCharsets.UTF_8);
frame.release();
return text;
}));
assertEquals(input.toList().toBlocking().first(), output.toList().toBlocking().first());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册