ServerSentEventHttpMessageWriter.java 7.1 KB
Newer Older
1
/*
2
 * Copyright 2002-2018 the original author or authors.
3 4 5 6 7 8 9 10 11 12 13 14 15 16
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

17
package org.springframework.http.codec;
18

19
import java.nio.charset.StandardCharsets;
20
import java.time.Duration;
21
import java.util.Collections;
22
import java.util.List;
23
import java.util.Map;
24 25 26 27 28 29

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.core.ResolvableType;
30
import org.springframework.core.codec.CodecException;
31
import org.springframework.core.codec.Encoder;
32
import org.springframework.core.codec.Hints;
33 34
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
35 36
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer;
37
import org.springframework.http.MediaType;
38
import org.springframework.http.ReactiveHttpOutputMessage;
39 40
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
41
import org.springframework.lang.Nullable;
42
import org.springframework.util.Assert;
43
import org.springframework.util.StringUtils;
44 45

/**
46
 * {@code HttpMessageWriter} for {@code "text/event-stream"} responses.
47
 *
48
 * @author Sebastien Deleuze
49
 * @author Arjen Poutsma
50
 * @author Rossen Stoyanchev
51
 * @since 5.0
52
 */
53
public class ServerSentEventHttpMessageWriter implements HttpMessageWriter<Object> {
54

55 56
	private static final MediaType DEFAULT_MEDIA_TYPE = new MediaType("text", "event-stream", StandardCharsets.UTF_8);

57
	private static final List<MediaType> WRITABLE_MEDIA_TYPES = Collections.singletonList(MediaType.TEXT_EVENT_STREAM);
58

59

60
	@Nullable
61
	private final Encoder<?> encoder;
62

R
Polish  
Rossen Stoyanchev 已提交
63

64 65 66 67 68 69 70 71
	/**
	 * Constructor without an {@code Encoder}. In this mode only {@code String}
	 * is supported for event data to be encoded.
	 */
	public ServerSentEventHttpMessageWriter() {
		this(null);
	}

72
	/**
73 74 75
	 * Constructor with JSON {@code Encoder} for encoding objects.
	 * Support for {@code String} event data is built-in.
	 * @param encoder the Encoder to use (may be {@code null})
76
	 */
77
	public ServerSentEventHttpMessageWriter(@Nullable Encoder<?> encoder) {
78
		this.encoder = encoder;
79 80
	}

81

82
	/**
83
	 * Return the configured {@code Encoder}, if any.
84
	 */
85
	@Nullable
86 87 88 89
	public Encoder<?> getEncoder() {
		return this.encoder;
	}

90 91 92
	@Override
	public List<MediaType> getWritableMediaTypes() {
		return WRITABLE_MEDIA_TYPES;
93 94
	}

R
Polish  
Rossen Stoyanchev 已提交
95

96
	@Override
97
	public boolean canWrite(ResolvableType elementType, @Nullable MediaType mediaType) {
98
		return (mediaType == null || MediaType.TEXT_EVENT_STREAM.includes(mediaType) ||
99
				ServerSentEvent.class.isAssignableFrom(elementType.toClass()));
100 101 102
	}

	@Override
103
	public Mono<Void> write(Publisher<?> input, ResolvableType elementType, @Nullable MediaType mediaType,
104
			ReactiveHttpOutputMessage message, Map<String, Object> hints) {
105

106 107 108 109 110
		mediaType = (mediaType != null && mediaType.getCharset() != null ? mediaType : DEFAULT_MEDIA_TYPE);
		DataBufferFactory bufferFactory = message.bufferFactory();

		message.getHeaders().setContentType(mediaType);
		return message.writeAndFlushWith(encode(input, elementType, mediaType, bufferFactory, hints));
111
	}
112

113 114
	private Flux<Publisher<DataBuffer>> encode(Publisher<?> input, ResolvableType elementType,
			MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) {
115

116 117
		Class<?> elementClass = elementType.getRawClass();
		ResolvableType valueType = (elementClass != null && ServerSentEvent.class.isAssignableFrom(elementClass) ?
118
				elementType.getGeneric() : elementType);
119

120
		return Flux.from(input).map(element -> {
121

122 123
			ServerSentEvent<?> sse = (element instanceof ServerSentEvent ?
					(ServerSentEvent<?>) element : ServerSentEvent.builder().data(element).build());
124 125

			StringBuilder sb = new StringBuilder();
126 127 128 129 130 131 132
			String id = sse.id();
			String event = sse.event();
			Duration retry = sse.retry();
			String comment = sse.comment();
			Object data = sse.data();
			if (id != null) {
				writeField("id", id, sb);
133
			}
134 135
			if (event != null) {
				writeField("event", event, sb);
136
			}
137 138
			if (retry != null) {
				writeField("retry", retry.toMillis(), sb);
139
			}
140
			if (comment != null) {
141
				sb.append(':').append(StringUtils.replace(comment, "\n", "\n:")).append("\n");
142
			}
143
			if (data != null) {
144 145
				sb.append("data:");
			}
146

147 148
			return Flux.concat(encodeText(sb, mediaType, factory),
					encodeData(data, valueType, mediaType, factory, hints),
149 150
					encodeText("\n", mediaType, factory))
					.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
151
		});
152
	}
153

154 155 156 157 158
	private void writeField(String fieldName, Object fieldValue, StringBuilder stringBuilder) {
		stringBuilder.append(fieldName);
		stringBuilder.append(':');
		stringBuilder.append(fieldValue.toString());
		stringBuilder.append("\n");
159 160
	}

R
Rossen Stoyanchev 已提交
161
	@SuppressWarnings("unchecked")
162
	private <T> Flux<DataBuffer> encodeData(@Nullable T data, ResolvableType valueType,
163
			MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) {
164 165 166 167 168 169 170

		if (data == null) {
			return Flux.empty();
		}

		if (data instanceof String) {
			String text = (String) data;
171
			return Flux.from(encodeText(StringUtils.replace(text, "\n", "\ndata:") + "\n", mediaType, factory));
172 173
		}

174 175 176 177
		if (this.encoder == null) {
			return Flux.error(new CodecException("No SSE encoder configured and the data is not String."));
		}

178
		return ((Encoder<T>) this.encoder)
179 180
				.encode(Mono.just(data), factory, valueType, mediaType, hints)
				.concatWith(encodeText("\n", mediaType, factory));
R
Rossen Stoyanchev 已提交
181 182
	}

183 184 185
	private Mono<DataBuffer> encodeText(CharSequence text, MediaType mediaType, DataBufferFactory bufferFactory) {
		Assert.notNull(mediaType.getCharset(), "Expected MediaType with charset");
		byte[] bytes = text.toString().getBytes(mediaType.getCharset());
186 187
		return Mono.defer(() ->
				Mono.just(bufferFactory.allocateBuffer(bytes.length).write(bytes)));
188 189
	}

190
	@Override
191
	public Mono<Void> write(Publisher<?> input, ResolvableType actualType, ResolvableType elementType,
192
			@Nullable MediaType mediaType, ServerHttpRequest request, ServerHttpResponse response,
193 194
			Map<String, Object> hints) {

195 196
		Map<String, Object> allHints = Hints.merge(hints,
				getEncodeHints(actualType, elementType, mediaType, request, response));
197

198 199 200 201
		return write(input, elementType, mediaType, response, allHints);
	}

	private Map<String, Object> getEncodeHints(ResolvableType actualType, ResolvableType elementType,
202
			@Nullable MediaType mediaType, ServerHttpRequest request, ServerHttpResponse response) {
203

204
		if (this.encoder instanceof HttpMessageEncoder) {
205 206
			HttpMessageEncoder<?> encoder = (HttpMessageEncoder<?>) this.encoder;
			return encoder.getEncodeHints(actualType, elementType, mediaType, request, response);
207
		}
208
		return Hints.none();
209 210
	}

211
}