diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/config/WebReactiveConfiguration.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/config/WebReactiveConfiguration.java index d262dfcd906486b80f14001a8df3f9c3482385fc..303aac307546d3122bb18e3edab95234aee7ca77 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/config/WebReactiveConfiguration.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/config/WebReactiveConfiguration.java @@ -44,7 +44,7 @@ import org.springframework.http.codec.EncoderHttpMessageWriter; import org.springframework.http.codec.HttpMessageReader; import org.springframework.http.codec.HttpMessageWriter; import org.springframework.http.codec.ResourceHttpMessageWriter; -import org.springframework.http.codec.SseEventHttpMessageWriter; +import org.springframework.http.codec.ServerSentEventHttpMessageWriter; import org.springframework.http.codec.json.Jackson2JsonDecoder; import org.springframework.http.codec.json.Jackson2JsonEncoder; import org.springframework.http.codec.xml.Jaxb2XmlDecoder; @@ -372,7 +372,7 @@ public class WebReactiveConfiguration implements ApplicationContextAware { writers.add(new EncoderHttpMessageWriter<>(jacksonEncoder)); sseDataEncoders.add(jacksonEncoder); } - writers.add(new SseEventHttpMessageWriter(sseDataEncoders)); + writers.add(new ServerSentEventHttpMessageWriter(sseDataEncoders)); } /** * Override this to modify the list of message writers after it has been diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java index 559eafc9d3cc7c0d6111508f7608d51e4e572e6d..3e3d5e43235022477b2d30f25bb635a58ffb6f24 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java @@ -28,7 +28,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.MediaType; import org.springframework.http.client.reactive.ReactorClientHttpConnector; -import org.springframework.http.codec.SseEvent; +import org.springframework.http.codec.ServerSentEvent; import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests; import org.springframework.http.server.reactive.HttpHandler; import org.springframework.tests.TestSubscriber; @@ -135,14 +135,11 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { } @RequestMapping("/sse/event") - Flux sse() { - return Flux.interval(Duration.ofMillis(100)).map(l -> { - SseEvent event = new SseEvent(); - event.setId(Long.toString(l)); - event.setData("foo"); - event.setComment("bar"); - return event; - }).take(2); + Flux> sse() { + return Flux.interval(Duration.ofMillis(100)).map(l -> ServerSentEvent.builder("foo") + .id(Long.toString(l)) + .comment("bar") + .build()).take(2); } } diff --git a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEvent.java b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..9f24bd6490d08d20b8b3029af9db4ec3acf399d7 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEvent.java @@ -0,0 +1,226 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.codec; + +import java.time.Duration; +import java.util.Optional; + +import org.springframework.http.codec.json.Jackson2JsonEncoder; + +/** + * Representation for a Server-Sent Event for use with Spring's reactive Web + * support. {@code Flux} or {@code Observable} is the + * reactive equivalent to Spring MVC's {@code SseEmitter}. + * + * @param the type of data that this event contains + * @author Sebastien Deleuze + * @author Arjen Poutsma + * @see ServerSentEventHttpMessageWriter + * @see Server-Sent Events W3C recommendation + * @since 5.0 + */ +public class ServerSentEvent { + + private final String id; + + private final String event; + + private final T data; + + private final Duration retry; + + private final String comment; + + private ServerSentEvent(String id, String event, T data, Duration retry, String comment) { + this.id = id; + this.event = event; + this.data = data; + this.retry = retry; + this.comment = comment; + } + + /** + * Return a builder for a {@code SseEvent}. + * + * @param the type of data that this event contains + * @return the builder + */ + public static Builder builder() { + return new BuilderImpl<>(); + } + + /** + * Return a builder for a {@code SseEvent}, populated with the give {@linkplain #data() data}. + * + * @param the type of data that this event contains + * @return the builder + */ + public static Builder builder(T data) { + return new BuilderImpl<>(data); + } + + /** + * Return the {@code id} field of this event, if available. + */ + public Optional id() { + return Optional.ofNullable(this.id); + } + + /** + * Return the {@code event} field of this event, if available. + */ + public Optional event() { + return Optional.ofNullable(this.event); + } + + /** + * Return the {@code data} field of this event, if available. + */ + public Optional data() { + return Optional.ofNullable(this.data); + } + + /** + * Return the {@code retry} field of this event, if available. + */ + public Optional retry() { + return Optional.ofNullable(this.retry); + } + + /** + * Return the comment of this event, if available. + */ + public Optional comment() { + return Optional.ofNullable(this.comment); + } + + /** + * A mutable builder for a {@code SseEvent}. + * + * @param the type of data that this event contains + */ + public interface Builder { + + /** + * Set the value of the {@code id} field. + * + * @param id the value of the id field + * @return {@code this} builder + */ + Builder id(String id); + + /** + * Set the value of the {@code event} field. + * + * @param event the value of the event field + * @return {@code this} builder + */ + Builder event(String event); + + /** + * Set the value of the {@code data} field. If the {@code data} argument is a multi-line {@code String}, it + * will be turned into multiple {@code data} field lines as defined in Server-Sent Events + * W3C recommendation. If {@code data} is not a String, it will be + * {@linkplain Jackson2JsonEncoder encoded} into JSON. + * + * @param data the value of the data field + * @return {@code this} builder + */ + Builder data(T data); + + /** + * Set the value of the {@code retry} field. + * + * @param retry the value of the retry field + * @return {@code this} builder + */ + Builder retry(Duration retry); + + /** + * Set SSE comment. If a multi-line comment is provided, it will be turned into multiple + * SSE comment lines as defined in Server-Sent Events W3C + * recommendation. + * + * @param comment the comment to set + * @return {@code this} builder + */ + Builder comment(String comment); + + /** + * Builds the event. + * + * @return the built event + */ + ServerSentEvent build(); + } + + private static class BuilderImpl implements Builder { + + private T data; + + private String id; + + private String event; + + private Duration retry; + + private String comment; + + public BuilderImpl() { + } + + public BuilderImpl(T data) { + this.data = data; + } + + @Override + public Builder id(String id) { + this.id = id; + return this; + } + + @Override + public Builder event(String event) { + this.event = event; + return this; + } + + @Override + public Builder data(T data) { + this.data = data; + return this; + } + + @Override + public Builder retry(Duration retry) { + this.retry = retry; + return this; + } + + @Override + public Builder comment(String comment) { + this.comment = comment; + return this; + } + + @Override + public ServerSentEvent build() { + return new ServerSentEvent(this.id, this.event, this.data, this.retry, this.comment); + } + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/codec/SseEventHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java similarity index 56% rename from spring-web/src/main/java/org/springframework/http/codec/SseEventHttpMessageWriter.java rename to spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java index 103a1f27e9b3b19ea7ba9e3c81a3731bbe075bdb..2568ad11f2707af698d1afd52ec4ead9a4c7e690 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/SseEventHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java @@ -33,24 +33,29 @@ import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.util.Assert; +import org.springframework.util.MimeTypeUtils; /** - * Encoder that supports a stream of {@link SseEvent}s and also plain - * {@link Object}s which is the same as an {@link SseEvent} with data + * Encoder that supports a stream of {@link ServerSentEvent}s and also plain + * {@link Object}s which is the same as an {@link ServerSentEvent} with data * only. * * @author Sebastien Deleuze - * @since 5.0 * @author Arjen Poutsma + * @since 5.0 */ -public class SseEventHttpMessageWriter implements HttpMessageWriter { +public class ServerSentEventHttpMessageWriter implements HttpMessageWriter { private static final MediaType TEXT_EVENT_STREAM = new MediaType("text", "event-stream"); private final List> dataEncoders; - public SseEventHttpMessageWriter(List> dataEncoders) { + public ServerSentEventHttpMessageWriter() { + this.dataEncoders = Collections.emptyList(); + } + + public ServerSentEventHttpMessageWriter(List> dataEncoders) { Assert.notNull(dataEncoders, "'dataEncoders' must not be null"); this.dataEncoders = dataEncoders; } @@ -67,7 +72,7 @@ public class SseEventHttpMessageWriter implements HttpMessageWriter { @Override public Mono write(Publisher inputStream, ResolvableType type, - MediaType contentType, ReactiveHttpOutputMessage outputMessage) { + MediaType contentType, ReactiveHttpOutputMessage outputMessage) { outputMessage.getHeaders().setContentType(TEXT_EVENT_STREAM); @@ -82,68 +87,60 @@ public class SseEventHttpMessageWriter implements HttpMessageWriter { } private Flux> encode(Publisher inputStream, - DataBufferFactory bufferFactory, ResolvableType type) { - - return Flux.from(inputStream).map(input -> { - SseEvent event = - (SseEvent.class.equals(type.getRawClass()) ? (SseEvent) input : - new SseEvent(input)); - - StringBuilder sb = new StringBuilder(); - - if (event.getId() != null) { - sb.append("id:"); - sb.append(event.getId()); - sb.append("\n"); - } - - if (event.getName() != null) { - sb.append("event:"); - sb.append(event.getName()); - sb.append("\n"); - } - - if (event.getReconnectTime() != null) { - sb.append("retry:"); - sb.append(event.getReconnectTime().toString()); - sb.append("\n"); - } - - if (event.getComment() != null) { - sb.append(":"); - sb.append(event.getComment().replaceAll("\\n", "\n:")); - sb.append("\n"); - } - - Object data = event.getData(); - Flux dataBuffer = Flux.empty(); - MediaType mediaType = - (event.getMediaType() == null ? MediaType.ALL : event.getMediaType()); - if (data != null) { - sb.append("data:"); - if (data instanceof String) { - sb.append(((String) data).replaceAll("\\n", "\ndata:")).append("\n"); - } - else { - dataBuffer = applyEncoder(data, mediaType, bufferFactory); - } - } - - return Flux.concat(encodeString(sb.toString(), bufferFactory), dataBuffer, - encodeString("\n", bufferFactory)); - }); + DataBufferFactory bufferFactory, ResolvableType type) { + + return Flux.from(inputStream) + .map(o -> toSseEvent(o, type)) + .map(sse -> { + StringBuilder sb = new StringBuilder(); + sse.id().ifPresent(id -> writeField("id", id, sb)); + sse.event().ifPresent(event -> writeField("event", event, sb)); + sse.retry().ifPresent(retry -> writeField("retry", retry.toMillis(), sb)); + sse.comment().ifPresent(comment -> { + comment = comment.replaceAll("\\n", "\n:"); + sb.append(':').append(comment).append("\n"); + }); + Flux dataBuffer = sse.data() + .>map(data -> { + sb.append("data:"); + if (data instanceof String) { + String stringData = ((String) data).replaceAll("\\n", "\ndata:"); + sb.append(stringData).append('\n'); + return Flux.empty(); + } + else { + return applyEncoder(data, bufferFactory); + } + }).orElse(Flux.empty()); + + return Flux.concat(encodeString(sb.toString(), bufferFactory), dataBuffer, + encodeString("\n", bufferFactory)); + }); + + } + + private ServerSentEvent toSseEvent(Object data, ResolvableType type) { + return ServerSentEvent.class.isAssignableFrom(type.getRawClass()) + ? (ServerSentEvent) data + : ServerSentEvent.builder().data(data).build(); + } + private void writeField(String fieldName, Object fieldValue, StringBuilder stringBuilder) { + stringBuilder.append(fieldName); + stringBuilder.append(':'); + stringBuilder.append(fieldValue.toString()); + stringBuilder.append("\n"); } @SuppressWarnings("unchecked") - private Flux applyEncoder(Object data, MediaType mediaType, DataBufferFactory bufferFactory) { + private Flux applyEncoder(Object data, DataBufferFactory bufferFactory) { ResolvableType elementType = ResolvableType.forClass(data.getClass()); Optional> encoder = dataEncoders - .stream() - .filter(e -> e.canEncode(elementType, mediaType)) - .findFirst(); + .stream() + .filter(e -> e.canEncode(elementType, MimeTypeUtils.APPLICATION_JSON)) + .findFirst(); return ((Encoder) encoder.orElseThrow(() -> new CodecException("No suitable encoder found!"))) - .encode(Mono.just((T) data), bufferFactory, elementType, mediaType) + .encode(Mono.just((T) data), bufferFactory, elementType, MimeTypeUtils.APPLICATION_JSON) .concatWith(encodeString("\n", bufferFactory)); } diff --git a/spring-web/src/main/java/org/springframework/http/codec/SseEvent.java b/spring-web/src/main/java/org/springframework/http/codec/SseEvent.java deleted file mode 100644 index ddbc5ae7b0db342cfcac836c31423883ab5d1017..0000000000000000000000000000000000000000 --- a/spring-web/src/main/java/org/springframework/http/codec/SseEvent.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Copyright 2002-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.http.codec; - -import org.springframework.http.MediaType; - -/** - * Representation for a Server-Sent Event for use with Spring's reactive Web - * support. {@code Flux} or {@code Observable} is the - * reactive equivalent to Spring MVC's {@code SseEmitter}. - * - * @author Sebastien Deleuze - * @since 5.0 - * @see SseEventHttpMessageWriter - * @see Server-Sent Events W3C recommendation - */ -public class SseEvent { - - private String id; - - private String name; - - private Object data; - - private MediaType mediaType; - - private Long reconnectTime; - - private String comment; - - - /** - * Create an empty instance. - */ - public SseEvent() { - } - - /** - * Create an instance with the provided {@code data}. - */ - public SseEvent(Object data) { - this.data = data; - } - - /** - * Create an instance with the provided {@code data} and {@code mediaType}. - */ - public SseEvent(Object data, MediaType mediaType) { - this.data = data; - this.mediaType = mediaType; - } - - - /** - * Set the {@code id} SSE field - */ - public void setId(String id) { - this.id = id; - } - - /** - * @see #setId(String) - */ - public String getId() { - return id; - } - - /** - * Set the {@code event} SSE field - */ - public void setName(String name) { - this.name = name; - } - - /** - * @see #setName(String) - */ - public String getName() { - return name; - } - - /** - * Set {@code data} SSE field. If a multiline {@code String} is provided, it will be - * turned into multiple {@code data} field lines as defined in Server-Sent Events - * W3C recommendation. - * - * If no {@code mediaType} is defined, default {@link SseEventHttpMessageWriter} will: - * - Turn single line {@code String} to a single {@code data} field - * - Turn multiline line {@code String} to multiple {@code data} fields - * - Serialize other {@code Object} as JSON - * - * @see #setMediaType(MediaType) - */ - public void setData(Object data) { - this.data = data; - } - - /** - * @see #setData(Object) - */ - public Object getData() { - return data; - } - - /** - * Set the {@link MediaType} used to serialize the {@code data}. - * {@link SseEventHttpMessageWriter} should be configured with the relevant encoder to be - * able to serialize it. - */ - public void setMediaType(MediaType mediaType) { - this.mediaType = mediaType; - } - - /** - * @see #setMediaType(MediaType) - */ - public MediaType getMediaType() { - return this.mediaType; - } - - /** - * Set the {@code retry} SSE field - */ - public void setReconnectTime(Long reconnectTime) { - this.reconnectTime = reconnectTime; - } - - /** - * @see #setReconnectTime(Long) - */ - public Long getReconnectTime() { - return reconnectTime; - } - - /** - * Set SSE comment. If a multiline comment is provided, it will be turned into multiple - * SSE comment lines by {@link SseEventHttpMessageWriter} as defined in Server-Sent Events W3C - * recommendation. - */ - public void setComment(String comment) { - this.comment = comment; - } - - /** - * @see #setComment(String) - */ - public String getComment() { - return comment; - } - -} diff --git a/spring-web/src/test/java/org/springframework/http/codec/SseEventHttpMessageWriterTests.java b/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java similarity index 79% rename from spring-web/src/test/java/org/springframework/http/codec/SseEventHttpMessageWriterTests.java rename to spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java index 24d8bf5068120325cdb7d1363b1ef1fca61396a2..2fdd65ded01b3ea47af65f4121929de198b08dcf 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/SseEventHttpMessageWriterTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java @@ -16,6 +16,7 @@ package org.springframework.http.codec; +import java.time.Duration; import java.util.Collections; import org.junit.Test; @@ -31,44 +32,43 @@ import org.springframework.http.codec.json.Jackson2JsonEncoder; import org.springframework.http.server.reactive.MockServerHttpResponse; import org.springframework.tests.TestSubscriber; -import static org.junit.Assert.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** * @author Sebastien Deleuze */ -public class SseEventHttpMessageWriterTests extends AbstractDataBufferAllocatingTestCase { +public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAllocatingTestCase { - private SseEventHttpMessageWriter converter = new SseEventHttpMessageWriter( + private ServerSentEventHttpMessageWriter messageWriter = new ServerSentEventHttpMessageWriter( Collections.singletonList(new Jackson2JsonEncoder())); @Test public void nullMimeType() { - assertTrue(converter.canWrite(ResolvableType.forClass(Object.class), null)); + assertTrue(messageWriter.canWrite(ResolvableType.forClass(Object.class), null)); } @Test public void unsupportedMimeType() { - assertFalse(converter.canWrite(ResolvableType.forClass(Object.class), + assertFalse(messageWriter.canWrite(ResolvableType.forClass(Object.class), new MediaType("foo", "bar"))); } @Test public void supportedMimeType() { - assertTrue(converter.canWrite(ResolvableType.forClass(Object.class), + assertTrue(messageWriter.canWrite(ResolvableType.forClass(Object.class), new MediaType("text", "event-stream"))); } @Test public void encodeServerSentEvent() { - SseEvent event = new SseEvent(); - event.setId("c42"); - event.setName("foo"); - event.setComment("bla\nbla bla\nbla bla bla"); - event.setReconnectTime(123L); - Mono source = Mono.just(event); + ServerSentEvent + event = ServerSentEvent.builder().data("bar").id("c42").event("foo").comment("bla\nbla bla\nbla bla bla") + .retry(Duration.ofMillis(123L)).build(); + Mono> source = Mono.just(event); MockServerHttpResponse outputMessage = new MockServerHttpResponse(); - converter.write(source, ResolvableType.forClass(SseEvent.class), + messageWriter.write(source, ResolvableType.forClass(ServerSentEvent.class), new MediaType("text", "event-stream"), outputMessage); Publisher> result = outputMessage.getBodyWithFlush(); @@ -77,7 +77,8 @@ public class SseEventHttpMessageWriterTests extends AbstractDataBufferAllocating assertValuesWith(publisher -> { TestSubscriber.subscribe(publisher).assertNoError().assertValuesWith( stringConsumer("id:c42\n" + "event:foo\n" + "retry:123\n" + - ":bla\n:bla bla\n:bla bla bla\n"), + ":bla\n:bla bla\n:bla bla bla\n" + + "data:bar\n"), stringConsumer("\n")); }); @@ -87,7 +88,7 @@ public class SseEventHttpMessageWriterTests extends AbstractDataBufferAllocating public void encodeString() { Flux source = Flux.just("foo", "bar"); MockServerHttpResponse outputMessage = new MockServerHttpResponse(); - converter.write(source, ResolvableType.forClass(String.class), + messageWriter.write(source, ResolvableType.forClass(String.class), new MediaType("text", "event-stream"), outputMessage); Publisher> result = outputMessage.getBodyWithFlush(); @@ -110,7 +111,7 @@ public class SseEventHttpMessageWriterTests extends AbstractDataBufferAllocating public void encodeMultiLineString() { Flux source = Flux.just("foo\nbar", "foo\nbaz"); MockServerHttpResponse outputMessage = new MockServerHttpResponse(); - converter.write(source, ResolvableType.forClass(String.class), + messageWriter.write(source, ResolvableType.forClass(String.class), new MediaType("text", "event-stream"), outputMessage); Publisher> result = outputMessage.getBodyWithFlush(); @@ -134,7 +135,7 @@ public class SseEventHttpMessageWriterTests extends AbstractDataBufferAllocating Flux source = Flux.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar")); MockServerHttpResponse outputMessage = new MockServerHttpResponse(); - converter.write(source, ResolvableType.forClass(Pojo.class), + messageWriter.write(source, ResolvableType.forClass(Pojo.class), new MediaType("text", "event-stream"), outputMessage); Publisher> result = outputMessage.getBodyWithFlush();