提交 a1320cd4 编写于 作者: A Arjen Poutsma

Add SSE support to WebMvc.fn

This commit adds support for sending Server-Sent Events in WebMvc.fn,
through the ServerResponse.sse method that takes a SseBuilder DSL.
It also includes reference documentation.

Closes gh-25920
上级 c7f2f50c
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server;
import java.io.IOException;
import java.io.OutputStream;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.util.Assert;
/**
* Implementation of {@code ServerHttpResponse} that delegates all calls to a
* given target {@code ServerHttpResponse}.
*
* @author Arjen Poutsma
* @since 5.3.2
*/
public class DelegatingServerHttpResponse implements ServerHttpResponse {
private final ServerHttpResponse delegate;
/**
* Create a new {@code DelegatingServerHttpResponse}.
* @param delegate the response to delegate to
*/
public DelegatingServerHttpResponse(ServerHttpResponse delegate) {
Assert.notNull(delegate, "Delegate must not be null");
this.delegate = delegate;
}
/**
* Returns the target response that this response delegates to.
* @return the delegate
*/
public ServerHttpResponse getDelegate() {
return this.delegate;
}
@Override
public void setStatusCode(HttpStatus status) {
this.delegate.setStatusCode(status);
}
@Override
public void flush() throws IOException {
this.delegate.flush();
}
@Override
public void close() {
this.delegate.close();
}
@Override
public OutputStream getBody() throws IOException {
return this.delegate.getBody();
}
@Override
public HttpHeaders getHeaders() {
return this.delegate.getHeaders();
}
}
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.servlet.function;
import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Set;
import javax.servlet.ServletException;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.context.request.ServletWebRequest;
import org.springframework.web.servlet.ModelAndView;
/**
* Abstract base class for {@link ServerResponse} implementations.
*
* @author Arjen Poutsma
* @since 5.3.2
*/
abstract class AbstractServerResponse extends ErrorHandlingServerResponse {
private static final Set<HttpMethod> SAFE_METHODS = EnumSet.of(HttpMethod.GET, HttpMethod.HEAD);
final int statusCode;
private final HttpHeaders headers;
private final MultiValueMap<String, Cookie> cookies;
protected AbstractServerResponse(
int statusCode, HttpHeaders headers, MultiValueMap<String, Cookie> cookies) {
this.statusCode = statusCode;
this.headers = HttpHeaders.readOnlyHttpHeaders(headers);
this.cookies =
CollectionUtils.unmodifiableMultiValueMap(new LinkedMultiValueMap<>(cookies));
}
@Override
public final HttpStatus statusCode() {
return HttpStatus.valueOf(this.statusCode);
}
@Override
public int rawStatusCode() {
return this.statusCode;
}
@Override
public final HttpHeaders headers() {
return this.headers;
}
@Override
public MultiValueMap<String, Cookie> cookies() {
return this.cookies;
}
@Override
public ModelAndView writeTo(HttpServletRequest request, HttpServletResponse response,
Context context) throws ServletException, IOException {
try {
writeStatusAndHeaders(response);
long lastModified = headers().getLastModified();
ServletWebRequest servletWebRequest = new ServletWebRequest(request, response);
HttpMethod httpMethod = HttpMethod.resolve(request.getMethod());
if (SAFE_METHODS.contains(httpMethod) &&
servletWebRequest.checkNotModified(headers().getETag(), lastModified)) {
return null;
}
else {
return writeToInternal(request, response, context);
}
}
catch (Throwable throwable) {
return handleError(throwable, request, response, context);
}
}
private void writeStatusAndHeaders(HttpServletResponse response) {
response.setStatus(this.statusCode);
writeHeaders(response);
writeCookies(response);
}
private void writeHeaders(HttpServletResponse servletResponse) {
this.headers.forEach((headerName, headerValues) -> {
for (String headerValue : headerValues) {
servletResponse.addHeader(headerName, headerValue);
}
});
// HttpServletResponse exposes some headers as properties: we should include those if not already present
if (servletResponse.getContentType() == null && this.headers.getContentType() != null) {
servletResponse.setContentType(this.headers.getContentType().toString());
}
if (servletResponse.getCharacterEncoding() == null &&
this.headers.getContentType() != null &&
this.headers.getContentType().getCharset() != null) {
servletResponse.setCharacterEncoding(this.headers.getContentType().getCharset().name());
}
}
private void writeCookies(HttpServletResponse servletResponse) {
this.cookies.values().stream()
.flatMap(Collection::stream)
.forEach(servletResponse::addCookie);
}
@Nullable
protected abstract ModelAndView writeToInternal(
HttpServletRequest request, HttpServletResponse response, Context context)
throws ServletException, IOException;
}
......@@ -237,8 +237,7 @@ final class DefaultEntityResponseBuilder<T> implements EntityResponse.Builder<T>
/**
* Default {@link EntityResponse} implementation for synchronous bodies.
*/
private static class DefaultEntityResponse<T> extends DefaultServerResponseBuilder.AbstractServerResponse
implements EntityResponse<T> {
private static class DefaultEntityResponse<T> extends AbstractServerResponse implements EntityResponse<T> {
private final T entity;
......
......@@ -150,8 +150,7 @@ final class DefaultRenderingResponseBuilder implements RenderingResponse.Builder
}
private static final class DefaultRenderingResponse extends DefaultServerResponseBuilder.AbstractServerResponse
implements RenderingResponse {
private static final class DefaultRenderingResponse extends AbstractServerResponse implements RenderingResponse {
private final String name;
......
......@@ -16,20 +16,16 @@
package org.springframework.web.servlet.function;
import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javax.servlet.ServletException;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
......@@ -40,12 +36,9 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.context.request.ServletWebRequest;
import org.springframework.web.servlet.ModelAndView;
/**
......@@ -224,111 +217,6 @@ class DefaultServerResponseBuilder implements ServerResponse.BodyBuilder {
}
/**
* Abstract base class for {@link ServerResponse} implementations.
*/
abstract static class AbstractServerResponse extends ErrorHandlingServerResponse {
private static final Set<HttpMethod> SAFE_METHODS = EnumSet.of(HttpMethod.GET, HttpMethod.HEAD);
final int statusCode;
private final HttpHeaders headers;
private final MultiValueMap<String, Cookie> cookies;
protected AbstractServerResponse(
int statusCode, HttpHeaders headers, MultiValueMap<String, Cookie> cookies) {
this.statusCode = statusCode;
this.headers = HttpHeaders.readOnlyHttpHeaders(headers);
this.cookies =
CollectionUtils.unmodifiableMultiValueMap(new LinkedMultiValueMap<>(cookies));
}
@Override
public final HttpStatus statusCode() {
return HttpStatus.valueOf(this.statusCode);
}
@Override
public int rawStatusCode() {
return this.statusCode;
}
@Override
public final HttpHeaders headers() {
return this.headers;
}
@Override
public MultiValueMap<String, Cookie> cookies() {
return this.cookies;
}
@Override
public ModelAndView writeTo(HttpServletRequest request, HttpServletResponse response,
Context context) throws ServletException, IOException {
try {
writeStatusAndHeaders(response);
long lastModified = headers().getLastModified();
ServletWebRequest servletWebRequest = new ServletWebRequest(request, response);
HttpMethod httpMethod = HttpMethod.resolve(request.getMethod());
if (SAFE_METHODS.contains(httpMethod) &&
servletWebRequest.checkNotModified(headers().getETag(), lastModified)) {
return null;
}
else {
return writeToInternal(request, response, context);
}
}
catch (Throwable throwable) {
return handleError(throwable, request, response, context);
}
}
private void writeStatusAndHeaders(HttpServletResponse response) {
response.setStatus(this.statusCode);
writeHeaders(response);
writeCookies(response);
}
private void writeHeaders(HttpServletResponse servletResponse) {
this.headers.forEach((headerName, headerValues) -> {
for (String headerValue : headerValues) {
servletResponse.addHeader(headerName, headerValue);
}
});
// HttpServletResponse exposes some headers as properties: we should include those if not already present
if (servletResponse.getContentType() == null && this.headers.getContentType() != null) {
servletResponse.setContentType(this.headers.getContentType().toString());
}
if (servletResponse.getCharacterEncoding() == null &&
this.headers.getContentType() != null &&
this.headers.getContentType().getCharset() != null) {
servletResponse.setCharacterEncoding(this.headers.getContentType().getCharset().name());
}
}
private void writeCookies(HttpServletResponse servletResponse) {
this.cookies.values().stream()
.flatMap(Collection::stream)
.forEach(servletResponse::addCookie);
}
@Nullable
protected abstract ModelAndView writeToInternal(
HttpServletRequest request, HttpServletResponse response, Context context)
throws ServletException, IOException;
}
private static class WriterFunctionResponse extends AbstractServerResponse {
private final BiFunction<HttpServletRequest, HttpServletResponse, ModelAndView> writeFunction;
......
......@@ -230,11 +230,6 @@ public interface ServerResponse {
* <p>This method can be used to set the response status code, headers, and
* body based on an asynchronous result. If only the body is asynchronous,
* {@link BodyBuilder#body(Object)} can be used instead.
*
* <p><strong>Note</strong> that
* {@linkplain RenderingResponse rendering responses}, as returned by
* {@link BodyBuilder#render}, are <strong>not</strong> supported as value
* for {@code asyncResponse}. Use WebFlux.fn for asynchronous rendering.
* @param asyncResponse a {@code CompletableFuture<ServerResponse>} or
* {@code Publisher<ServerResponse>}
* @return the asynchronous response
......@@ -255,11 +250,6 @@ public interface ServerResponse {
* <p>This method can be used to set the response status code, headers, and
* body based on an asynchronous result. If only the body is asynchronous,
* {@link BodyBuilder#body(Object)} can be used instead.
*
* <p><strong>Note</strong> that
* {@linkplain RenderingResponse rendering responses}, as returned by
* {@link BodyBuilder#render}, are <strong>not</strong> supported as value
* for {@code asyncResponse}. Use WebFlux.fn for asynchronous rendering.
* @param asyncResponse a {@code CompletableFuture<ServerResponse>} or
* {@code Publisher<ServerResponse>}
* @param timeout maximum time period to wait for before timing out
......@@ -270,6 +260,65 @@ public interface ServerResponse {
return AsyncServerResponse.create(asyncResponse, timeout);
}
/**
* Create a server-sent event response. The {@link SseBuilder} provided to
* {@code consumer} can be used to build and send events.
*
* <p>For example:
* <pre class="code">
* public ServerResponse handleSse(ServerRequest request) {
* return ServerResponse.sse(sse -> sse.send("Hello World!"));
* }
* </pre>
*
* <p>or, to set both the id and event type:
* <pre class="code">
* public ServerResponse handleSse(ServerRequest request) {
* return ServerResponse.sse(sse -> sse
* .id("42)
* .event("event")
* .send("Hello World!"));
* }
* </pre>
* @param consumer consumer that will be provided with an event builder
* @return the server-side event response
* @since 5.3.2
* @see <a href="https://www.w3.org/TR/eventsource/">Server-Sent Events</a>
*/
static ServerResponse sse(Consumer<SseBuilder> consumer) {
return SseServerResponse.create(consumer, null);
}
/**
* Create a server-sent event response. The {@link SseBuilder} provided to
* {@code consumer} can be used to build and send events.
*
* <p>For example:
* <pre class="code">
* public ServerResponse handleSse(ServerRequest request) {
* return ServerResponse.sse(sse -> sse.send("Hello World!"));
* }
* </pre>
*
* <p>or, to set both the id and event type:
* <pre class="code">
* public ServerResponse handleSse(ServerRequest request) {
* return ServerResponse.sse(sse -> sse
* .id("42)
* .event("event")
* .send("Hello World!"));
* }
* </pre>
* @param consumer consumer that will be provided with an event builder
* @param timeout maximum time period to wait before timing out
* @return the server-side event response
* @since 5.3.2
* @see <a href="https://www.w3.org/TR/eventsource/">Server-Sent Events</a>
*/
static ServerResponse sse(Consumer<SseBuilder> consumer, Duration timeout) {
return SseServerResponse.create(consumer, timeout);
}
/**
* Defines a builder that adds headers to the response.
......@@ -473,6 +522,105 @@ public interface ServerResponse {
}
/**
* Defines a builder for a body that sends server-sent events.
*
* @since 5.3.2
*/
interface SseBuilder {
/**
* Sends the given object as a server-sent event.
* Strings will be sent as UTF-8 encoded bytes, and other objects will
* be converted into JSON using
* {@linkplain HttpMessageConverter message converters}.
*
* <p>This convenience method has the same effect as
* {@link #data(Object)}.
* @param object the object to send
* @throws IOException in case of I/O errors
*/
void send(Object object) throws IOException;
/**
* Add an SSE "id" line.
* @param id the event identifier
* @return this builder
*/
SseBuilder id(String id);
/**
* Add an SSE "event" line.
* @param eventName the event name
* @return this builder
*/
SseBuilder event(String eventName);
/**
* Add an SSE "retry" line.
* @param duration the duration to convert into millis
* @return this builder
*/
SseBuilder retry(Duration duration);
/**
* Add an SSE comment.
* @param comment the comment
* @return this builder
*/
SseBuilder comment(String comment);
/**
* Add an SSE "data" line for the given object and sends the built
* server-sent event to the client.
* Strings will be sent as UTF-8 encoded bytes, and other objects will
* be converted into JSON using
* {@linkplain HttpMessageConverter message converters}.
* @param object the object to send as data
* @throws IOException in case of I/O errors
*/
void data(Object object) throws IOException;
/**
* Completes the event stream with the given error.
*
* <p>The throwable is dispatched back into Spring MVC, and passed to
* its exception handling mechanism. Since the response has
* been committed by this point, the response status can not change.
* @param t the throwable to dispatch
*/
void error(Throwable t);
/**
* Completes the event stream.
*/
void complete();
/**
* Register a callback to be invoked when an SSE request times
* out.
* @param onTimeout the callback to invoke on timeout
* @return this builder
*/
SseBuilder onTimeout(Runnable onTimeout);
/**
* Register a callback to be invoked when an error occurs during SSE
* processing.
* @param onError the callback to invoke on error
* @return this builder
*/
SseBuilder onError(Consumer<Throwable> onError);
/**
* Register a callback to be invoked when the SSE request completes.
* @param onCompletion the callback to invoked on completion
* @return this builder
*/
SseBuilder onComplete(Runnable onCompletion);
}
/**
* Defines the context used during the {@link #writeTo(HttpServletRequest, HttpServletResponse, Context)}.
*/
......
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.servlet.function;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import javax.servlet.ServletException;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.springframework.http.CacheControl;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.server.DelegatingServerHttpResponse;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpResponse;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.MultiValueMap;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.servlet.ModelAndView;
/**
* Implementation of {@link ServerResponse} for sending
* <a href="https://www.w3.org/TR/eventsource/">Server-Sent Events</a>.
*
* @author Arjen Poutsma
* @since 5.3.2
*/
final class SseServerResponse extends AbstractServerResponse {
private final Consumer<SseBuilder> sseConsumer;
@Nullable
private final Duration timeout;
private SseServerResponse(Consumer<SseBuilder> sseConsumer, @Nullable Duration timeout) {
super(200, createHeaders(), emptyCookies());
this.sseConsumer = sseConsumer;
this.timeout = timeout;
}
private static HttpHeaders createHeaders() {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.TEXT_EVENT_STREAM);
headers.setCacheControl(CacheControl.noCache());
return headers;
}
private static MultiValueMap<String, Cookie> emptyCookies() {
return CollectionUtils.toMultiValueMap(Collections.emptyMap());
}
@Nullable
@Override
protected ModelAndView writeToInternal(HttpServletRequest request, HttpServletResponse response,
Context context) throws ServletException, IOException {
DeferredResult<?> result;
if (this.timeout != null) {
result = new DeferredResult<>(this.timeout.toMillis());
}
else {
result = new DeferredResult<>();
}
AsyncServerResponse.writeAsync(request, response, result);
this.sseConsumer.accept(new DefaultSseBuilder(response, context, result));
return null;
}
public static ServerResponse create(Consumer<SseBuilder> sseConsumer, @Nullable Duration timeout) {
Assert.notNull(sseConsumer, "SseConsumer must not be null");
return new SseServerResponse(sseConsumer, timeout);
}
private static final class DefaultSseBuilder implements SseBuilder {
private static final byte[] NL_NL = new byte[]{'\n', '\n'};
private final ServerHttpResponse outputMessage;
private final DeferredResult<?> deferredResult;
private final List<HttpMessageConverter<?>> messageConverters;
private final StringBuilder builder = new StringBuilder();
private boolean sendFailed;
public DefaultSseBuilder(HttpServletResponse response, Context context, DeferredResult<?> deferredResult) {
this.outputMessage = new ServletServerHttpResponse(response);
this.deferredResult = deferredResult;
this.messageConverters = context.messageConverters();
}
@Override
public void send(Object object) throws IOException {
data(object);
}
@Override
public SseBuilder id(String id) {
Assert.hasLength(id, "Id must not be empty");
return field("id", id);
}
@Override
public SseBuilder event(String eventName) {
Assert.hasLength(eventName, "Name must not be empty");
return field("event", eventName);
}
@Override
public SseBuilder retry(Duration duration) {
Assert.notNull(duration, "Duration must not be null");
String millis = Long.toString(duration.toMillis());
return field("retry", millis);
}
@Override
public SseBuilder comment(String comment) {
Assert.hasLength(comment, "Comment must not be empty");
String[] lines = comment.split("\n");
for (String line : lines) {
field("", line);
}
return this;
}
private SseBuilder field(String name, String value) {
this.builder.append(name).append(':').append(value).append('\n');
return this;
}
@Override
public void data(Object object) throws IOException {
Assert.notNull(object, "Object must not be null");
if (object instanceof String) {
writeString((String) object);
}
else {
writeObject(object);
}
}
private void writeString(String string) throws IOException {
String[] lines = string.split("\n");
for (String line : lines) {
field("data", line);
}
this.builder.append('\n');
try {
OutputStream body = this.outputMessage.getBody();
body.write(builderBytes());
body.flush();
}
catch (IOException ex) {
this.sendFailed = true;
throw ex;
}
finally {
this.builder.setLength(0);
}
}
@SuppressWarnings("unchecked")
private void writeObject(Object data) throws IOException {
this.builder.append("data:");
try {
this.outputMessage.getBody().write(builderBytes());
Class<?> dataClass = data.getClass();
for (HttpMessageConverter<?> converter : this.messageConverters) {
if (converter.canWrite(dataClass, MediaType.APPLICATION_JSON)) {
HttpMessageConverter<Object> objectConverter = (HttpMessageConverter<Object>) converter;
ServerHttpResponse response = new MutableHeadersServerHttpResponse(this.outputMessage);
objectConverter.write(data, MediaType.APPLICATION_JSON, response);
this.outputMessage.getBody().write(NL_NL);
this.outputMessage.flush();
return;
}
}
}
catch (IOException ex) {
this.sendFailed = true;
throw ex;
}
finally {
this.builder.setLength(0);
}
}
private byte[] builderBytes() {
return this.builder.toString().getBytes(StandardCharsets.UTF_8);
}
@Override
public void error(Throwable t) {
if (this.sendFailed) {
return;
}
this.deferredResult.setErrorResult(t);
}
@Override
public void complete() {
if (this.sendFailed) {
return;
}
try {
this.outputMessage.flush();
this.deferredResult.setResult(null);
}
catch (IOException ex) {
this.deferredResult.setErrorResult(ex);
}
}
@Override
public SseBuilder onTimeout(Runnable onTimeout) {
this.deferredResult.onTimeout(onTimeout);
return this;
}
@Override
public SseBuilder onError(Consumer<Throwable> onError) {
this.deferredResult.onError(onError);
return this;
}
@Override
public SseBuilder onComplete(Runnable onCompletion) {
this.deferredResult.onCompletion(onCompletion);
return this;
}
/**
* Wrap to silently ignore header changes HttpMessageConverter's that would
* otherwise cause HttpHeaders to raise exceptions.
*/
private static final class MutableHeadersServerHttpResponse extends DelegatingServerHttpResponse {
private final HttpHeaders mutableHeaders = new HttpHeaders();
public MutableHeadersServerHttpResponse(ServerHttpResponse delegate) {
super(delegate);
this.mutableHeaders.putAll(delegate.getHeaders());
}
@Override
public HttpHeaders getHeaders() {
return this.mutableHeaders;
}
}
}
}
......@@ -17,7 +17,6 @@
package org.springframework.web.servlet.mvc.method.annotation;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
......@@ -31,11 +30,11 @@ import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.ResolvableType;
import org.springframework.core.task.TaskExecutor;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.http.server.DelegatingServerHttpResponse;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpResponse;
import org.springframework.lang.Nullable;
......@@ -255,41 +254,20 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur
* Wrap to silently ignore header changes HttpMessageConverter's that would
* otherwise cause HttpHeaders to raise exceptions.
*/
private static class StreamingServletServerHttpResponse implements ServerHttpResponse {
private final ServerHttpResponse delegate;
private static class StreamingServletServerHttpResponse extends DelegatingServerHttpResponse {
private final HttpHeaders mutableHeaders = new HttpHeaders();
public StreamingServletServerHttpResponse(ServerHttpResponse delegate) {
this.delegate = delegate;
super(delegate);
this.mutableHeaders.putAll(delegate.getHeaders());
}
@Override
public void setStatusCode(HttpStatus status) {
this.delegate.setStatusCode(status);
}
@Override
public HttpHeaders getHeaders() {
return this.mutableHeaders;
}
@Override
public OutputStream getBody() throws IOException {
return this.delegate.getBody();
}
@Override
public void flush() throws IOException {
this.delegate.flush();
}
@Override
public void close() {
this.delegate.close();
}
}
}
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.servlet.function;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Collections;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.web.servlet.ModelAndView;
import org.springframework.web.testfixture.servlet.MockHttpServletRequest;
import org.springframework.web.testfixture.servlet.MockHttpServletResponse;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Arjen Poutsma
*/
class SseServerResponseTests {
private MockHttpServletRequest mockRequest;
private MockHttpServletResponse mockResponse;
@BeforeEach
void setUp() {
this.mockRequest = new MockHttpServletRequest("GET", "https://example.com");
this.mockRequest.setAsyncSupported(true);
this.mockResponse = new MockHttpServletResponse();
}
@Test
void sendString() throws Exception {
String body = "foo bar";
ServerResponse response = ServerResponse.sse(sse -> {
try {
sse.send(body);
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
});
ServerResponse.Context context = Collections::emptyList;
ModelAndView mav = response.writeTo(this.mockRequest, this.mockResponse, context);
assertThat(mav).isNull();
String expected = "data:" + body + "\n\n";
assertThat(this.mockResponse.getContentAsString()).isEqualTo(expected);
}
@Test
void sendObject() throws Exception {
Person person = new Person("John Doe", 42);
ServerResponse response = ServerResponse.sse(sse -> {
try {
sse.send(person);
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
});
ServerResponse.Context context = () -> Collections.singletonList(new MappingJackson2HttpMessageConverter());
ModelAndView mav = response.writeTo(this.mockRequest, this.mockResponse, context);
assertThat(mav).isNull();
String expected = "data:{\"name\":\"John Doe\",\"age\":42}\n\n";
assertThat(this.mockResponse.getContentAsString()).isEqualTo(expected);
}
@Test
public void builder() throws Exception {
String body = "foo bar";
ServerResponse response = ServerResponse.sse(sse -> {
try {
sse.id("id")
.event("name")
.comment("comment line 1\ncomment line 2")
.retry(Duration.ofSeconds(1))
.data("data");
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
});
ServerResponse.Context context = Collections::emptyList;
ModelAndView mav = response.writeTo(this.mockRequest, this.mockResponse, context);
assertThat(mav).isNull();
String expected = "id:id\n" +
"event:name\n" +
":comment line 1\n" +
":comment line 2\n" +
"retry:1000\n" +
"data:data\n\n";
assertThat(this.mockResponse.getContentAsString()).isEqualTo(expected);
}
private static final class Person {
private final String name;
private final int age;
public Person(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return this.name;
}
public int getAge() {
return this.age;
}
}
}
......@@ -229,6 +229,60 @@ Mono<ServerResponse> asyncResponse = webClient.get().retrieve().bodyToMono(Perso
ServerResponse.async(asyncResponse);
----
https://www.w3.org/TR/eventsource/[Server-Sent Events] can be provided via the
static `sse` method on `ServerResponse`. The builder provided by that method
allows you to send Strings, or other objects as JSON. For example:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
public RouterFunction<ServerResponse> sse() {
return route(GET("/sse"), request -> ServerResponse.sse(sseBuilder -> {
// Save the sseBuilder object somewhere..
}));
}
// In some other thread, sending a String
sseBuilder.send("Hello world");
// Or an object, which will be transformed into JSON
Person person = ...
sseBuilder.send(person);
// Customize the event by using the other methods
sseBuilder.id("42")
.event("sse event")
.data(person);
// and done at some point
sseBuilder.complete();
----
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
fun sse(): RouterFunction<ServerResponse> = router {
GET("/sse") { request -> ServerResponse.sse { sseBuilder ->
// Save the sseBuilder object somewhere..
}
}
// In some other thread, sending a String
sseBuilder.send("Hello world")
// Or an object, which will be transformed into JSON
val person = ...
sseBuilder.send(person)
// Customize the event by using the other methods
sseBuilder.id("42")
.event("sse event")
.data(person)
// and done at some point
sseBuilder.complete()
----
[[webmvc-fn-handler-classes]]
=== Handler Classes
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册