提交 9aa25c39 编写于 作者: R Rossen Stoyanchev

Polish ServerSentEventHttpMessageReader

上级 d57d3efc
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -23,9 +23,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
......@@ -34,7 +32,6 @@ import reactor.util.function.Tuples;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.CodecException;
import org.springframework.core.codec.Decoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
......@@ -43,18 +40,22 @@ import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
import static java.util.stream.Collectors.joining;
/**
* Reader that supports a stream of {@link ServerSentEvent}s and also plain
* {@link Object}s which is the same as an {@link ServerSentEvent} with data
* only.
*
* @author Sebastien Deleuze
* @author Rossen Stoyanchev
* @since 5.0
*/
public class ServerSentEventHttpMessageReader implements HttpMessageReader<Object> {
private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r';
private final List<Decoder<?>> dataDecoders;
......@@ -75,51 +76,31 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec
}
@Override
public Flux<Object> read(ResolvableType elementType, ReactiveHttpInputMessage inputMessage, Map<String, Object> hints) {
boolean isSseElementType = ServerSentEvent.class.isAssignableFrom(elementType.getRawClass());
ResolvableType dataType = (isSseElementType ? elementType.getGeneric(0) : elementType);
public List<MediaType> getReadableMediaTypes() {
return Collections.singletonList(MediaType.TEXT_EVENT_STREAM);
}
@Override
public Flux<Object> read(ResolvableType elementType, ReactiveHttpInputMessage inputMessage,
Map<String, Object> hints) {
boolean hasSseWrapper = ServerSentEvent.class.isAssignableFrom(elementType.getRawClass());
ResolvableType dataType = (hasSseWrapper ? elementType.getGeneric(0) : elementType);
return Flux.from(inputMessage.getBody())
.concatMap(ServerSentEventHttpMessageReader::splitOnNewline)
.map(buffer -> Tuples.of(decodeDataBuffer(buffer), buffer.factory()))
.map(buffer -> {
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());
DataBufferUtils.release(buffer);
return Tuples.of(charBuffer.toString(), buffer.factory());
})
.bufferUntil(data -> data.getT1().equals("\n"))
.concatMap(list -> {
ServerSentEvent.Builder<Object> sseBuilder = ServerSentEvent.builder();
StringBuilder dataBuilder = new StringBuilder();
StringBuilder commentBuilder = new StringBuilder();
DataBufferFactory bufferFactory = list.stream().findFirst().get().getT2();
String[] lines = list.stream().map(t -> t.getT1()).collect(Collectors.joining()).split("\\r?\\n");
for (String line : lines) {
if (line.startsWith("id:")) {
sseBuilder.id(line.substring(3));
}
else if (line.startsWith("event:")) {
sseBuilder.event(line.substring(6));
}
else if (line.startsWith("data:")) {
dataBuilder.append(line.substring(5)).append("\n");
}
else if (line.startsWith("retry:")) {
sseBuilder.retry(Duration.ofMillis(Long.valueOf(line.substring(6))));
}
else if (line.startsWith(":")) {
commentBuilder.append(line.substring(1)).append("\n");
}
}
if (dataBuilder.length() > 0) {
String data = dataBuilder.toString();
if (String.class.isAssignableFrom(dataType.getRawClass())) {
sseBuilder.data(data.substring(0, data.length() - 1));
}
else {
sseBuilder.data(decode(data, bufferFactory, dataType, hints));
}
}
if (commentBuilder.length() > 0) {
String comment = commentBuilder.toString();
sseBuilder.comment(comment.substring(0, comment.length() - 1));
}
ServerSentEvent<Object> sse = sseBuilder.build();
return (isSseElementType ? Mono.just(sse) : Mono.justOrEmpty(sse.data()));
.concatMap(tuples -> {
String[] lines = tuples.stream().map(t -> t.getT1()).collect(joining()).split("\\r?\\n");
DataBufferFactory factory = tuples.stream().findAny().get().getT2();
ServerSentEvent<Object> event = buildEvent(lines, factory, dataType, hints);
return (hasSseWrapper ? Mono.just(event) : Mono.justOrEmpty(event.data()));
})
.cast(Object.class);
}
......@@ -141,29 +122,69 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec
return Flux.fromIterable(results);
}
private String decodeDataBuffer(DataBuffer dataBuffer) {
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer());
DataBufferUtils.release(dataBuffer);
return charBuffer.toString();
}
private ServerSentEvent<Object> buildEvent(String[] lines, DataBufferFactory bufferFactory,
ResolvableType dataType, Map<String, Object> hints) {
ServerSentEvent.Builder<Object> sseBuilder = ServerSentEvent.builder();
StringBuilder mutableData = new StringBuilder();
StringBuilder mutableComment = new StringBuilder();
for (String line : lines) {
if (line.startsWith("id:")) {
sseBuilder.id(line.substring(3));
}
else if (line.startsWith("event:")) {
sseBuilder.event(line.substring(6));
}
else if (line.startsWith("data:")) {
mutableData.append(line.substring(5)).append("\n");
}
else if (line.startsWith("retry:")) {
sseBuilder.retry(Duration.ofMillis(Long.valueOf(line.substring(6))));
}
else if (line.startsWith(":")) {
mutableComment.append(line.substring(1)).append("\n");
}
}
@SuppressWarnings("unchecked")
private <T> T decode(String data, DataBufferFactory bufferFactory, ResolvableType elementType, Map<String, Object> hints) {
Optional<Decoder<?>> decoder = dataDecoders
.stream()
.filter(e -> e.canDecode(elementType, MimeTypeUtils.APPLICATION_JSON))
.findFirst();
return ((Decoder<T>) decoder.orElseThrow(() -> new CodecException("No suitable decoder found!")))
.decodeToMono(Mono.just(bufferFactory.wrap(data.getBytes(StandardCharsets.UTF_8))), elementType, MimeTypeUtils.APPLICATION_JSON, hints).block();
if (mutableData.length() > 0) {
String data = mutableData.toString();
sseBuilder.data(decodeData(data, bufferFactory, dataType, hints));
}
if (mutableComment.length() > 0) {
String comment = mutableComment.toString();
sseBuilder.comment(comment.substring(0, comment.length() - 1));
}
return sseBuilder.build();
}
@Override
public Mono<Object> readMono(ResolvableType elementType, ReactiveHttpInputMessage inputMessage, Map<String, Object> hints) {
return Mono.error(new UnsupportedOperationException("ServerSentEventHttpMessageReader only supports reading stream of events as a Flux"));
private Object decodeData(String data, DataBufferFactory bufferFactory, ResolvableType dataType,
Map<String, Object> hints) {
if (String.class.isAssignableFrom(dataType.getRawClass())) {
return data.substring(0, data.length() - 1);
}
DataBuffer dataBuffer = bufferFactory.wrap(data.getBytes(StandardCharsets.UTF_8));
return this.dataDecoders.stream()
.filter(e -> e.canDecode(dataType, MimeTypeUtils.APPLICATION_JSON))
.findFirst()
.orElseThrow(() -> new CodecException("No suitable decoder found!"))
.decodeToMono(Mono.just(dataBuffer), dataType, MimeTypeUtils.APPLICATION_JSON, hints)
.block(Duration.ZERO);
}
@Override
public List<MediaType> getReadableMediaTypes() {
return Collections.singletonList(MediaType.TEXT_EVENT_STREAM);
public Mono<Object> readMono(ResolvableType elementType, ReactiveHttpInputMessage inputMessage,
Map<String, Object> hints) {
return Mono.error(new UnsupportedOperationException(
"ServerSentEventHttpMessageReader only supports reading stream of events as a Flux"));
}
}
......@@ -85,6 +85,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
.thenCancel()
.verify(Duration.ofSeconds(5L));
}
@Test
public void sseAsPerson() throws Exception {
Flux<Person> result = this.webClient.get()
......@@ -230,9 +231,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
'}';
return "Person{name='" + this.name + '\'' + '}';
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册