提交 edcf0491 编写于 作者: R Rossen Stoyanchev

Refactoring in reactive WebSocketMessage

Move WebSocketMessage factory methods to the WebSocketSession which
has the bufferFactory() needed to create message payloads.

WebSocketMessage is left with one public constructor.

WebSocketMessage exposes convenience retain/releasePayload methods.
上级 6cd92c69
......@@ -15,7 +15,10 @@
*/
package org.springframework.web.reactive.socket;
import java.nio.charset.StandardCharsets;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
......@@ -34,9 +37,17 @@ public class WebSocketMessage {
/**
* Private constructor. See static factory methods.
* Constructor for a WebSocketMessage. To create, see factory methods:
* <ul>
* <li>{@link WebSocketSession#textMessage}
* <li>{@link WebSocketSession#binaryMessage}
* <li>{@link WebSocketSession#pingMessage}
* <li>{@link WebSocketSession#pongMessage}
* </ul>
* <p>Alternatively use {@link WebSocketSession#bufferFactory()} to create
* the payload and then invoke this constructor.
*/
private WebSocketMessage(Type type, DataBuffer payload) {
public WebSocketMessage(Type type, DataBuffer payload) {
Assert.notNull(type, "'type' must not be null");
Assert.notNull(payload, "'payload' must not be null");
this.type = type;
......@@ -58,6 +69,42 @@ public class WebSocketMessage {
return this.payload;
}
/**
* Return the message payload as UTF-8 text. This is a useful for text
* WebSocket messages.
*/
public String getPayloadAsText() {
byte[] bytes = new byte[this.payload.readableByteCount()];
this.payload.read(bytes);
return new String(bytes, StandardCharsets.UTF_8);
}
/**
* Retain the data buffer for the message payload, which is useful on
* runtimes with pooled buffers, e.g. Netty. A shortcut for:
* <pre>
* DataBuffer payload = message.getPayload();
* DataBufferUtils.retain(payload);
* </pre>
* @see DataBufferUtils#retain(DataBuffer)
*/
public void retainPayload() {
DataBufferUtils.retain(this.payload);
}
/**
* Release the data buffer for the message payload, which is useful on
* runtimes with pooled buffers, e.g. Netty. This is a shortcut for:
* <pre>
* DataBuffer payload = message.getPayload();
* DataBufferUtils.release(payload);
* </pre>
* @see DataBufferUtils#release(DataBuffer)
*/
public void releasePayload() {
DataBufferUtils.release(this.payload);
}
@Override
public boolean equals(Object other) {
......@@ -78,42 +125,6 @@ public class WebSocketMessage {
}
/**
* Factory method to create a text WebSocket message.
*/
public static WebSocketMessage text(DataBuffer payload) {
return create(Type.TEXT, payload);
}
/**
* Factory method to create a binary WebSocket message.
*/
public static WebSocketMessage binary(DataBuffer payload) {
return create(Type.BINARY, payload);
}
/**
* Factory method to create a ping WebSocket message.
*/
public static WebSocketMessage ping(DataBuffer payload) {
return create(Type.PING, payload);
}
/**
* Factory method to create a pong WebSocket message.
*/
public static WebSocketMessage pong(DataBuffer payload) {
return create(Type.PONG, payload);
}
/**
* Factory method to create a WebSocket message of the given type.
*/
public static WebSocketMessage create(Type type, DataBuffer payload) {
return new WebSocketMessage(type, payload);
}
/**
* WebSocket message types.
*/
......
......@@ -16,11 +16,13 @@
package org.springframework.web.reactive.socket;
import java.net.URI;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
/**
......@@ -58,6 +60,30 @@ public interface WebSocketSession {
*/
Mono<Void> send(Publisher<WebSocketMessage> messages);
/**
* Factory method to create a text {@link WebSocketMessage} using the
* {@link #bufferFactory()} for the session.
*/
WebSocketMessage textMessage(String payload);
/**
* Factory method to create a binary WebSocketMessage using the
* {@link #bufferFactory()} for the session.
*/
WebSocketMessage binaryMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);
/**
* Factory method to create a ping WebSocketMessage using the
* {@link #bufferFactory()} for the session.
*/
WebSocketMessage pingMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);
/**
* Factory method to create a pong WebSocketMessage using the
* {@link #bufferFactory()} for the session.
*/
WebSocketMessage pongMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);
/**
* Close the WebSocket session with {@link CloseStatus#NORMAL}.
*/
......
......@@ -101,15 +101,15 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
if (Type.TEXT.equals(type)) {
byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = getBufferFactory().wrap(bytes);
return WebSocketMessage.create(Type.TEXT, buffer);
return new WebSocketMessage(Type.TEXT, buffer);
}
else if (Type.BINARY.equals(type)) {
DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message);
return WebSocketMessage.create(Type.BINARY, buffer);
return new WebSocketMessage(Type.BINARY, buffer);
}
else if (Type.PONG.equals(type)) {
DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message);
return WebSocketMessage.create(Type.PONG, buffer);
return new WebSocketMessage(Type.PONG, buffer);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message);
......
......@@ -78,12 +78,12 @@ public abstract class NettyWebSocketSessionSupport<T> extends WebSocketSessionSu
Class<?> frameType = frames.get(0).getClass();
if (frames.size() == 1) {
NettyDataBuffer buffer = bufferFactory().wrap(frames.get(0).content());
return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer);
return new WebSocketMessage(MESSAGE_TYPES.get(frameType), buffer);
}
return frames.stream()
.map(socketFrame -> bufferFactory().wrap(socketFrame.content()))
.reduce(NettyDataBuffer::write)
.map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer))
.map(buffer -> new WebSocketMessage(MESSAGE_TYPES.get(frameType), buffer))
.get();
}
......
......@@ -90,15 +90,15 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor
private <T> WebSocketMessage toMessage(T message) {
if (message instanceof String) {
byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
return WebSocketMessage.create(Type.TEXT, getBufferFactory().wrap(bytes));
return new WebSocketMessage(Type.TEXT, getBufferFactory().wrap(bytes));
}
else if (message instanceof ByteBuffer) {
DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message);
return WebSocketMessage.create(Type.BINARY, buffer);
return new WebSocketMessage(Type.BINARY, buffer);
}
else if (message instanceof PongMessage) {
DataBuffer buffer = getBufferFactory().wrap(((PongMessage) message).getApplicationData());
return WebSocketMessage.create(Type.PONG, buffer);
return new WebSocketMessage(Type.PONG, buffer);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message);
......
......@@ -102,15 +102,15 @@ public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupp
private <T> WebSocketMessage toMessage(Type type, T message) {
if (Type.TEXT.equals(type)) {
byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
return WebSocketMessage.create(Type.TEXT, getBufferFactory().wrap(bytes));
return new WebSocketMessage(Type.TEXT, getBufferFactory().wrap(bytes));
}
else if (Type.BINARY.equals(type)) {
DataBuffer buffer = getBufferFactory().allocateBuffer().write((ByteBuffer[]) message);
return WebSocketMessage.create(Type.BINARY, buffer);
return new WebSocketMessage(Type.BINARY, buffer);
}
else if (Type.PONG.equals(type)) {
DataBuffer buffer = getBufferFactory().allocateBuffer().write((ByteBuffer[]) message);
return WebSocketMessage.create(Type.PONG, buffer);
return new WebSocketMessage(Type.PONG, buffer);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message);
......
......@@ -17,14 +17,18 @@
package org.springframework.web.reactive.socket.adapter;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
......@@ -87,6 +91,30 @@ public abstract class WebSocketSessionSupport<T> implements WebSocketSession {
return this.bufferFactory;
}
@Override
public WebSocketMessage textMessage(String payload) {
byte[] bytes = payload.getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = bufferFactory().wrap(bytes);
return new WebSocketMessage(WebSocketMessage.Type.TEXT, buffer);
}
@Override
public WebSocketMessage binaryMessage(Function<DataBufferFactory, DataBuffer> payloadFactory) {
DataBuffer payload = payloadFactory.apply(bufferFactory());
return new WebSocketMessage(WebSocketMessage.Type.BINARY, payload);
}
@Override
public WebSocketMessage pingMessage(Function<DataBufferFactory, DataBuffer> payloadFactory) {
DataBuffer payload = payloadFactory.apply(bufferFactory());
return new WebSocketMessage(WebSocketMessage.Type.PING, payload);
}
@Override
public WebSocketMessage pongMessage(Function<DataBufferFactory, DataBuffer> payloadFactory) {
DataBuffer payload = payloadFactory.apply(bufferFactory());
return new WebSocketMessage(WebSocketMessage.Type.PONG, payload);
}
@Override
public final Mono<Void> close(CloseStatus status) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册