提交 18e491ac 编写于 作者: A Arjen Poutsma

Merge pull request #1143 from poutsma/server_sent_event

* server_sent_event:
  Refactored SseEvent to ServerSentEvent
......@@ -44,7 +44,7 @@ import org.springframework.http.codec.EncoderHttpMessageWriter;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.HttpMessageWriter;
import org.springframework.http.codec.ResourceHttpMessageWriter;
import org.springframework.http.codec.SseEventHttpMessageWriter;
import org.springframework.http.codec.ServerSentEventHttpMessageWriter;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.http.codec.xml.Jaxb2XmlDecoder;
......@@ -372,7 +372,7 @@ public class WebReactiveConfiguration implements ApplicationContextAware {
writers.add(new EncoderHttpMessageWriter<>(jacksonEncoder));
sseDataEncoders.add(jacksonEncoder);
}
writers.add(new SseEventHttpMessageWriter(sseDataEncoders));
writers.add(new ServerSentEventHttpMessageWriter(sseDataEncoders));
}
/**
* Override this to modify the list of message writers after it has been
......
......@@ -28,7 +28,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.SseEvent;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.tests.TestSubscriber;
......@@ -135,14 +135,11 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
}
@RequestMapping("/sse/event")
Flux<SseEvent> sse() {
return Flux.interval(Duration.ofMillis(100)).map(l -> {
SseEvent event = new SseEvent();
event.setId(Long.toString(l));
event.setData("foo");
event.setComment("bar");
return event;
}).take(2);
Flux<ServerSentEvent<String>> sse() {
return Flux.interval(Duration.ofMillis(100)).map(l -> ServerSentEvent.builder("foo")
.id(Long.toString(l))
.comment("bar")
.build()).take(2);
}
}
......
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec;
import java.time.Duration;
import java.util.Optional;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
/**
* Representation for a Server-Sent Event for use with Spring's reactive Web
* support. {@code Flux<SseEvent>} or {@code Observable<SseEvent>} is the
* reactive equivalent to Spring MVC's {@code SseEmitter}.
*
* @param <T> the type of data that this event contains
* @author Sebastien Deleuze
* @author Arjen Poutsma
* @see ServerSentEventHttpMessageWriter
* @see <a href="https://www.w3.org/TR/eventsource/">Server-Sent Events W3C recommendation</a>
* @since 5.0
*/
public class ServerSentEvent<T> {
private final String id;
private final String event;
private final T data;
private final Duration retry;
private final String comment;
private ServerSentEvent(String id, String event, T data, Duration retry, String comment) {
this.id = id;
this.event = event;
this.data = data;
this.retry = retry;
this.comment = comment;
}
/**
* Return a builder for a {@code SseEvent}.
*
* @param <T> the type of data that this event contains
* @return the builder
*/
public static <T> Builder<T> builder() {
return new BuilderImpl<>();
}
/**
* Return a builder for a {@code SseEvent}, populated with the give {@linkplain #data() data}.
*
* @param <T> the type of data that this event contains
* @return the builder
*/
public static <T> Builder<T> builder(T data) {
return new BuilderImpl<>(data);
}
/**
* Return the {@code id} field of this event, if available.
*/
public Optional<String> id() {
return Optional.ofNullable(this.id);
}
/**
* Return the {@code event} field of this event, if available.
*/
public Optional<String> event() {
return Optional.ofNullable(this.event);
}
/**
* Return the {@code data} field of this event, if available.
*/
public Optional<T> data() {
return Optional.ofNullable(this.data);
}
/**
* Return the {@code retry} field of this event, if available.
*/
public Optional<Duration> retry() {
return Optional.ofNullable(this.retry);
}
/**
* Return the comment of this event, if available.
*/
public Optional<String> comment() {
return Optional.ofNullable(this.comment);
}
/**
* A mutable builder for a {@code SseEvent}.
*
* @param <T> the type of data that this event contains
*/
public interface Builder<T> {
/**
* Set the value of the {@code id} field.
*
* @param id the value of the id field
* @return {@code this} builder
*/
Builder<T> id(String id);
/**
* Set the value of the {@code event} field.
*
* @param event the value of the event field
* @return {@code this} builder
*/
Builder<T> event(String event);
/**
* Set the value of the {@code data} field. If the {@code data} argument is a multi-line {@code String}, it
* will be turned into multiple {@code data} field lines as defined in Server-Sent Events
* W3C recommendation. If {@code data} is not a String, it will be
* {@linkplain Jackson2JsonEncoder encoded} into JSON.
*
* @param data the value of the data field
* @return {@code this} builder
*/
Builder<T> data(T data);
/**
* Set the value of the {@code retry} field.
*
* @param retry the value of the retry field
* @return {@code this} builder
*/
Builder<T> retry(Duration retry);
/**
* Set SSE comment. If a multi-line comment is provided, it will be turned into multiple
* SSE comment lines as defined in Server-Sent Events W3C
* recommendation.
*
* @param comment the comment to set
* @return {@code this} builder
*/
Builder<T> comment(String comment);
/**
* Builds the event.
*
* @return the built event
*/
ServerSentEvent<T> build();
}
private static class BuilderImpl<T> implements Builder<T> {
private T data;
private String id;
private String event;
private Duration retry;
private String comment;
public BuilderImpl() {
}
public BuilderImpl(T data) {
this.data = data;
}
@Override
public Builder<T> id(String id) {
this.id = id;
return this;
}
@Override
public Builder<T> event(String event) {
this.event = event;
return this;
}
@Override
public Builder<T> data(T data) {
this.data = data;
return this;
}
@Override
public Builder<T> retry(Duration retry) {
this.retry = retry;
return this;
}
@Override
public Builder<T> comment(String comment) {
this.comment = comment;
return this;
}
@Override
public ServerSentEvent<T> build() {
return new ServerSentEvent<T>(this.id, this.event, this.data, this.retry, this.comment);
}
}
}
......@@ -33,24 +33,29 @@ import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
/**
* Encoder that supports a stream of {@link SseEvent}s and also plain
* {@link Object}s which is the same as an {@link SseEvent} with data
* Encoder that supports a stream of {@link ServerSentEvent}s and also plain
* {@link Object}s which is the same as an {@link ServerSentEvent} with data
* only.
*
* @author Sebastien Deleuze
* @since 5.0
* @author Arjen Poutsma
* @since 5.0
*/
public class SseEventHttpMessageWriter implements HttpMessageWriter<Object> {
public class ServerSentEventHttpMessageWriter implements HttpMessageWriter<Object> {
private static final MediaType TEXT_EVENT_STREAM =
new MediaType("text", "event-stream");
private final List<Encoder<?>> dataEncoders;
public SseEventHttpMessageWriter(List<Encoder<?>> dataEncoders) {
public ServerSentEventHttpMessageWriter() {
this.dataEncoders = Collections.emptyList();
}
public ServerSentEventHttpMessageWriter(List<Encoder<?>> dataEncoders) {
Assert.notNull(dataEncoders, "'dataEncoders' must not be null");
this.dataEncoders = dataEncoders;
}
......@@ -67,7 +72,7 @@ public class SseEventHttpMessageWriter implements HttpMessageWriter<Object> {
@Override
public Mono<Void> write(Publisher<?> inputStream, ResolvableType type,
MediaType contentType, ReactiveHttpOutputMessage outputMessage) {
MediaType contentType, ReactiveHttpOutputMessage outputMessage) {
outputMessage.getHeaders().setContentType(TEXT_EVENT_STREAM);
......@@ -82,68 +87,60 @@ public class SseEventHttpMessageWriter implements HttpMessageWriter<Object> {
}
private Flux<Publisher<DataBuffer>> encode(Publisher<?> inputStream,
DataBufferFactory bufferFactory, ResolvableType type) {
return Flux.from(inputStream).map(input -> {
SseEvent event =
(SseEvent.class.equals(type.getRawClass()) ? (SseEvent) input :
new SseEvent(input));
StringBuilder sb = new StringBuilder();
if (event.getId() != null) {
sb.append("id:");
sb.append(event.getId());
sb.append("\n");
}
if (event.getName() != null) {
sb.append("event:");
sb.append(event.getName());
sb.append("\n");
}
if (event.getReconnectTime() != null) {
sb.append("retry:");
sb.append(event.getReconnectTime().toString());
sb.append("\n");
}
if (event.getComment() != null) {
sb.append(":");
sb.append(event.getComment().replaceAll("\\n", "\n:"));
sb.append("\n");
}
Object data = event.getData();
Flux<DataBuffer> dataBuffer = Flux.empty();
MediaType mediaType =
(event.getMediaType() == null ? MediaType.ALL : event.getMediaType());
if (data != null) {
sb.append("data:");
if (data instanceof String) {
sb.append(((String) data).replaceAll("\\n", "\ndata:")).append("\n");
}
else {
dataBuffer = applyEncoder(data, mediaType, bufferFactory);
}
}
return Flux.concat(encodeString(sb.toString(), bufferFactory), dataBuffer,
encodeString("\n", bufferFactory));
});
DataBufferFactory bufferFactory, ResolvableType type) {
return Flux.from(inputStream)
.map(o -> toSseEvent(o, type))
.map(sse -> {
StringBuilder sb = new StringBuilder();
sse.id().ifPresent(id -> writeField("id", id, sb));
sse.event().ifPresent(event -> writeField("event", event, sb));
sse.retry().ifPresent(retry -> writeField("retry", retry.toMillis(), sb));
sse.comment().ifPresent(comment -> {
comment = comment.replaceAll("\\n", "\n:");
sb.append(':').append(comment).append("\n");
});
Flux<DataBuffer> dataBuffer = sse.data()
.<Flux<DataBuffer>>map(data -> {
sb.append("data:");
if (data instanceof String) {
String stringData = ((String) data).replaceAll("\\n", "\ndata:");
sb.append(stringData).append('\n');
return Flux.empty();
}
else {
return applyEncoder(data, bufferFactory);
}
}).orElse(Flux.empty());
return Flux.concat(encodeString(sb.toString(), bufferFactory), dataBuffer,
encodeString("\n", bufferFactory));
});
}
private ServerSentEvent<?> toSseEvent(Object data, ResolvableType type) {
return ServerSentEvent.class.isAssignableFrom(type.getRawClass())
? (ServerSentEvent<?>) data
: ServerSentEvent.builder().data(data).build();
}
private void writeField(String fieldName, Object fieldValue, StringBuilder stringBuilder) {
stringBuilder.append(fieldName);
stringBuilder.append(':');
stringBuilder.append(fieldValue.toString());
stringBuilder.append("\n");
}
@SuppressWarnings("unchecked")
private <T> Flux<DataBuffer> applyEncoder(Object data, MediaType mediaType, DataBufferFactory bufferFactory) {
private <T> Flux<DataBuffer> applyEncoder(Object data, DataBufferFactory bufferFactory) {
ResolvableType elementType = ResolvableType.forClass(data.getClass());
Optional<Encoder<?>> encoder = dataEncoders
.stream()
.filter(e -> e.canEncode(elementType, mediaType))
.findFirst();
.stream()
.filter(e -> e.canEncode(elementType, MimeTypeUtils.APPLICATION_JSON))
.findFirst();
return ((Encoder<T>) encoder.orElseThrow(() -> new CodecException("No suitable encoder found!")))
.encode(Mono.just((T) data), bufferFactory, elementType, mediaType)
.encode(Mono.just((T) data), bufferFactory, elementType, MimeTypeUtils.APPLICATION_JSON)
.concatWith(encodeString("\n", bufferFactory));
}
......
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec;
import org.springframework.http.MediaType;
/**
* Representation for a Server-Sent Event for use with Spring's reactive Web
* support. {@code Flux<SseEvent>} or {@code Observable<SseEvent>} is the
* reactive equivalent to Spring MVC's {@code SseEmitter}.
*
* @author Sebastien Deleuze
* @since 5.0
* @see SseEventHttpMessageWriter
* @see <a href="https://www.w3.org/TR/eventsource/">Server-Sent Events W3C recommendation</a>
*/
public class SseEvent {
private String id;
private String name;
private Object data;
private MediaType mediaType;
private Long reconnectTime;
private String comment;
/**
* Create an empty instance.
*/
public SseEvent() {
}
/**
* Create an instance with the provided {@code data}.
*/
public SseEvent(Object data) {
this.data = data;
}
/**
* Create an instance with the provided {@code data} and {@code mediaType}.
*/
public SseEvent(Object data, MediaType mediaType) {
this.data = data;
this.mediaType = mediaType;
}
/**
* Set the {@code id} SSE field
*/
public void setId(String id) {
this.id = id;
}
/**
* @see #setId(String)
*/
public String getId() {
return id;
}
/**
* Set the {@code event} SSE field
*/
public void setName(String name) {
this.name = name;
}
/**
* @see #setName(String)
*/
public String getName() {
return name;
}
/**
* Set {@code data} SSE field. If a multiline {@code String} is provided, it will be
* turned into multiple {@code data} field lines as defined in Server-Sent Events
* W3C recommendation.
*
* If no {@code mediaType} is defined, default {@link SseEventHttpMessageWriter} will:
* - Turn single line {@code String} to a single {@code data} field
* - Turn multiline line {@code String} to multiple {@code data} fields
* - Serialize other {@code Object} as JSON
*
* @see #setMediaType(MediaType)
*/
public void setData(Object data) {
this.data = data;
}
/**
* @see #setData(Object)
*/
public Object getData() {
return data;
}
/**
* Set the {@link MediaType} used to serialize the {@code data}.
* {@link SseEventHttpMessageWriter} should be configured with the relevant encoder to be
* able to serialize it.
*/
public void setMediaType(MediaType mediaType) {
this.mediaType = mediaType;
}
/**
* @see #setMediaType(MediaType)
*/
public MediaType getMediaType() {
return this.mediaType;
}
/**
* Set the {@code retry} SSE field
*/
public void setReconnectTime(Long reconnectTime) {
this.reconnectTime = reconnectTime;
}
/**
* @see #setReconnectTime(Long)
*/
public Long getReconnectTime() {
return reconnectTime;
}
/**
* Set SSE comment. If a multiline comment is provided, it will be turned into multiple
* SSE comment lines by {@link SseEventHttpMessageWriter} as defined in Server-Sent Events W3C
* recommendation.
*/
public void setComment(String comment) {
this.comment = comment;
}
/**
* @see #setComment(String)
*/
public String getComment() {
return comment;
}
}
......@@ -16,6 +16,7 @@
package org.springframework.http.codec;
import java.time.Duration;
import java.util.Collections;
import org.junit.Test;
......@@ -31,44 +32,43 @@ import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.http.server.reactive.MockServerHttpResponse;
import org.springframework.tests.TestSubscriber;
import static org.junit.Assert.*;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* @author Sebastien Deleuze
*/
public class SseEventHttpMessageWriterTests extends AbstractDataBufferAllocatingTestCase {
public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAllocatingTestCase {
private SseEventHttpMessageWriter converter = new SseEventHttpMessageWriter(
private ServerSentEventHttpMessageWriter messageWriter = new ServerSentEventHttpMessageWriter(
Collections.singletonList(new Jackson2JsonEncoder()));
@Test
public void nullMimeType() {
assertTrue(converter.canWrite(ResolvableType.forClass(Object.class), null));
assertTrue(messageWriter.canWrite(ResolvableType.forClass(Object.class), null));
}
@Test
public void unsupportedMimeType() {
assertFalse(converter.canWrite(ResolvableType.forClass(Object.class),
assertFalse(messageWriter.canWrite(ResolvableType.forClass(Object.class),
new MediaType("foo", "bar")));
}
@Test
public void supportedMimeType() {
assertTrue(converter.canWrite(ResolvableType.forClass(Object.class),
assertTrue(messageWriter.canWrite(ResolvableType.forClass(Object.class),
new MediaType("text", "event-stream")));
}
@Test
public void encodeServerSentEvent() {
SseEvent event = new SseEvent();
event.setId("c42");
event.setName("foo");
event.setComment("bla\nbla bla\nbla bla bla");
event.setReconnectTime(123L);
Mono<SseEvent> source = Mono.just(event);
ServerSentEvent<String>
event = ServerSentEvent.<String>builder().data("bar").id("c42").event("foo").comment("bla\nbla bla\nbla bla bla")
.retry(Duration.ofMillis(123L)).build();
Mono<ServerSentEvent<String>> source = Mono.just(event);
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
converter.write(source, ResolvableType.forClass(SseEvent.class),
messageWriter.write(source, ResolvableType.forClass(ServerSentEvent.class),
new MediaType("text", "event-stream"), outputMessage);
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
......@@ -77,7 +77,8 @@ public class SseEventHttpMessageWriterTests extends AbstractDataBufferAllocating
assertValuesWith(publisher -> {
TestSubscriber.subscribe(publisher).assertNoError().assertValuesWith(
stringConsumer("id:c42\n" + "event:foo\n" + "retry:123\n" +
":bla\n:bla bla\n:bla bla bla\n"),
":bla\n:bla bla\n:bla bla bla\n" +
"data:bar\n"),
stringConsumer("\n"));
});
......@@ -87,7 +88,7 @@ public class SseEventHttpMessageWriterTests extends AbstractDataBufferAllocating
public void encodeString() {
Flux<String> source = Flux.just("foo", "bar");
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
converter.write(source, ResolvableType.forClass(String.class),
messageWriter.write(source, ResolvableType.forClass(String.class),
new MediaType("text", "event-stream"), outputMessage);
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
......@@ -110,7 +111,7 @@ public class SseEventHttpMessageWriterTests extends AbstractDataBufferAllocating
public void encodeMultiLineString() {
Flux<String> source = Flux.just("foo\nbar", "foo\nbaz");
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
converter.write(source, ResolvableType.forClass(String.class),
messageWriter.write(source, ResolvableType.forClass(String.class),
new MediaType("text", "event-stream"), outputMessage);
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
......@@ -134,7 +135,7 @@ public class SseEventHttpMessageWriterTests extends AbstractDataBufferAllocating
Flux<Pojo> source = Flux.just(new Pojo("foofoo", "barbar"),
new Pojo("foofoofoo", "barbarbar"));
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
converter.write(source, ResolvableType.forClass(Pojo.class),
messageWriter.write(source, ResolvableType.forClass(Pojo.class),
new MediaType("text", "event-stream"), outputMessage);
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册