提交 917a2fb9 编写于 作者: S Sebastien Deleuze 提交者: Rossen Stoyanchev

Add Decoder#decodeOne()

This commit adds a Decoder#decodeOne() method in order
to handle correctly the streaming versus one value
deserialization based on the type provided by the user.

For example, if a List parameter is provided in a controller
method, Jackson will be called once, while if the user provides
a Flux or an Observable parameter, Jackson will be called for
each element.
上级 12d7b781
......@@ -20,8 +20,11 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.util.MimeType;
/**
......@@ -51,4 +54,8 @@ public abstract class AbstractDecoder<T> implements Decoder<T> {
anyMatch(mt -> mt.isCompatibleWith(mimeType));
}
@Override
public Mono<T> decodeOne(Publisher<DataBuffer> inputStream, ResolvableType elementType, MimeType mimeType, Object... hints) {
throw new UnsupportedOperationException();
}
}
......@@ -20,6 +20,7 @@ import java.util.List;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
......@@ -60,6 +61,20 @@ public interface Decoder<T> {
Flux<T> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
MimeType mimeType, Object... hints);
/**
* Decode a {@link DataBuffer} input stream into a Mono of {@code T}.
*
* @param inputStream the {@code DataBuffer} input stream to decode
* @param elementType the expected type of elements in the output stream;
* this type must have been previously passed to the {@link #canDecode}
* method and it must have returned {@code true}.
* @param mimeType the MIME type associated with the input stream, optional
* @param hints additional information about how to do decode, optional
* @return the output stream with the decoded element
*/
Mono<T> decodeOne(Publisher<DataBuffer> inputStream, ResolvableType elementType,
MimeType mimeType, Object... hints);
/**
* Return the list of MIME types this decoder supports.
*/
......
......@@ -25,6 +25,7 @@ import java.util.function.IntPredicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
......@@ -94,6 +95,21 @@ public class StringDecoder extends AbstractDecoder<String> {
});
}
@Override
public Mono<String> decodeOne(Publisher<DataBuffer> inputStream, ResolvableType elementType,
MimeType mimeType, Object... hints) {
Charset charset = getCharset(mimeType);
return Flux.from(inputStream)
.map(dataBuffer -> {
CharBuffer charBuffer = charset.decode(dataBuffer.asByteBuffer());
DataBufferUtils.release(dataBuffer);
return charBuffer.toString();
})
.collect(StringBuilder::new, StringBuilder::append)
.map(StringBuilder::toString);
}
private static Flux<DataBuffer> splitOnNewline(DataBuffer dataBuffer) {
List<DataBuffer> results = new ArrayList<DataBuffer>();
int startIdx = 0;
......
......@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.type.TypeFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.CodecException;
......@@ -46,7 +47,9 @@ public class JacksonJsonDecoder extends AbstractDecoder<Object> {
private final ObjectMapper mapper;
private final Decoder<DataBuffer> preProcessor = new JsonObjectDecoder();
private final Decoder<DataBuffer> fluxPreProcessor = new JsonObjectDecoder();
private final Decoder<DataBuffer> monoPreProcessor = new JsonObjectDecoder(false);
public JacksonJsonDecoder() {
......@@ -69,7 +72,31 @@ public class JacksonJsonDecoder extends AbstractDecoder<Object> {
JavaType javaType = typeFactory.constructType(elementType.getType());
ObjectReader reader = this.mapper.readerFor(javaType);
return this.preProcessor.decode(inputStream, elementType, mimeType, hints)
return this.fluxPreProcessor.decode(inputStream, elementType, mimeType, hints)
.map(dataBuffer -> {
try {
Object value = reader.readValue(dataBuffer.asInputStream());
DataBufferUtils.release(dataBuffer);
return value;
}
catch (IOException e) {
return Flux.error(new CodecException("Error while reading the data", e));
}
});
}
@Override
public Mono<Object> decodeOne(Publisher<DataBuffer> inputStream, ResolvableType elementType,
MimeType mimeType, Object... hints) {
Assert.notNull(inputStream, "'inputStream' must not be null");
Assert.notNull(elementType, "'elementType' must not be null");
TypeFactory typeFactory = this.mapper.getTypeFactory();
JavaType javaType = typeFactory.constructType(elementType.getType());
ObjectReader reader = this.mapper.readerFor(javaType);
return this.monoPreProcessor.decode(inputStream, elementType, mimeType, hints)
.single()
.map(dataBuffer -> {
try {
Object value = reader.readValue(dataBuffer.asInputStream());
......
......@@ -125,6 +125,18 @@ public class CodecHttpMessageConverter<T> implements HttpMessageConverter<T> {
return this.decoder.decode(inputMessage.getBody(), type, contentType);
}
@Override
public Mono<T> readOne(ResolvableType type, ReactiveHttpInputMessage inputMessage) {
if (this.decoder == null) {
return Mono.error(new IllegalStateException("No decoder set"));
}
MediaType contentType = inputMessage.getHeaders().getContentType();
if (contentType == null) {
contentType = MediaType.APPLICATION_OCTET_STREAM;
}
return this.decoder.decodeOne(inputMessage.getBody(), type, contentType);
}
@Override
public Mono<Void> write(Publisher<? extends T> inputStream, ResolvableType type,
MediaType contentType, ReactiveHttpOutputMessage outputMessage) {
......
......@@ -50,16 +50,27 @@ public interface HttpMessageConverter<T> {
List<MediaType> getReadableMediaTypes();
/**
* Read an object of the given type form the given input message, and returns it.
* Read a {@link Flux} of the given type form the given input message, and returns it.
* @param type the type of object to return. This type must have previously been
* passed to the
* {@link #canRead canRead} method of this interface, which must have returned {@code
* true}.
* @param inputMessage the HTTP input message to read from
* @return the converted object
* @return the converted {@link Flux} of elements
*/
Flux<T> read(ResolvableType type, ReactiveHttpInputMessage inputMessage);
/**
* Read a {@link Mono} of the given type form the given input message, and returns it.
* @param type the type of object to return. This type must have previously been
* passed to the
* {@link #canRead canRead} method of this interface, which must have returned {@code
* true}.
* @param inputMessage the HTTP input message to read from
* @return the converted {@link Mono} of object
*/
Mono<T> readOne(ResolvableType type, ReactiveHttpInputMessage inputMessage);
/**
* Indicates whether the given class can be written by this converter.
* @param type the class to test for writability
......
......@@ -23,6 +23,7 @@ import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import org.springframework.core.Conventions;
import org.springframework.core.MethodParameter;
......@@ -122,7 +123,8 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
ResolvableType type = ResolvableType.forMethodParameter(parameter);
boolean isAsyncType = isAsyncType(type);
ResolvableType elementType = (isAsyncType ? type.getGeneric(0) : type);
boolean isStreamableType = isStreamableType(type);
ResolvableType elementType = (isStreamableType || isAsyncType ? type.getGeneric(0) : type);
MediaType mediaType = exchange.getRequest().getHeaders().getContentType();
if (mediaType == null) {
......@@ -131,23 +133,33 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
for (HttpMessageConverter<?> converter : getMessageConverters()) {
if (converter.canRead(elementType, mediaType)) {
Flux<?> elementFlux = converter.read(elementType, exchange.getRequest());
if (this.validator != null) {
elementFlux= applyValidationIfApplicable(elementFlux, parameter);
}
if (Mono.class.equals(type.getRawClass())) {
return Mono.just(Mono.from(elementFlux));
}
else if (Flux.class.equals(type.getRawClass())) {
return Mono.just(elementFlux);
}
else if (isAsyncType) {
return Mono.just(getConversionService().convert(elementFlux, type.getRawClass()));
if (isStreamableType) {
Publisher<?> elements = converter.read(elementType, exchange.getRequest());
if (this.validator != null) {
elements= applyValidationIfApplicable(elements, parameter);
}
if (Flux.class.equals(type.getRawClass())) {
return Mono.just(elements);
}
else if (isAsyncType && this.conversionService.canConvert(Flux.class, type.getRawClass())) {
return Mono.just(this.conversionService.convert(elements, type.getRawClass()));
}
}
else {
return elementFlux.next().map(o -> o);
Mono<?> element = converter.readOne(elementType, exchange.getRequest());
if (this.validator != null) {
element = Mono.from(applyValidationIfApplicable(element, parameter));
}
if (Mono.class.equals(type.getRawClass())) {
return Mono.just(element);
}
else if (isAsyncType && this.conversionService.canConvert(Mono.class, type.getRawClass())) {
return Mono.just(this.conversionService.convert(element, type.getRawClass()));
}
else {
return (Mono<Object>)element;
}
}
}
}
......@@ -157,23 +169,28 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
private boolean isAsyncType(ResolvableType type) {
return (Mono.class.equals(type.getRawClass()) || Flux.class.equals(type.getRawClass()) ||
getConversionService().canConvert(Publisher.class, type.getRawClass()));
getConversionService().canConvert(Mono.class, type.getRawClass()) ||
getConversionService().canConvert(Flux.class, type.getRawClass()));
}
private boolean isStreamableType(ResolvableType type) {
return this.conversionService.canConvert(Flux.class, type.getRawClass());
}
protected Flux<?> applyValidationIfApplicable(Flux<?> elementFlux, MethodParameter methodParam) {
protected Publisher<?> applyValidationIfApplicable(Publisher<?> elements, MethodParameter methodParam) {
Annotation[] annotations = methodParam.getParameterAnnotations();
for (Annotation ann : annotations) {
Validated validAnnot = AnnotationUtils.getAnnotation(ann, Validated.class);
if (validAnnot != null || ann.annotationType().getSimpleName().startsWith("Valid")) {
Object hints = (validAnnot != null ? validAnnot.value() : AnnotationUtils.getValue(ann));
Object[] validationHints = (hints instanceof Object[] ? (Object[]) hints : new Object[] {hints});
return elementFlux.map(element -> {
return Flux.from(elements).map(element -> {
validate(element, validationHints, methodParam);
return element;
});
}
}
return elementFlux;
return elements;
}
/**
......
......@@ -19,6 +19,7 @@ package org.springframework.core.codec;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import org.springframework.core.ResolvableType;
......@@ -96,4 +97,18 @@ public class StringDecoderTests extends AbstractDataBufferAllocatingTestCase {
.assertValues("");
}
@Test
public void decodeOne() throws InterruptedException {
this.decoder = new StringDecoder(false);
Flux<DataBuffer> source =
Flux.just(stringBuffer("foo"), stringBuffer("bar"), stringBuffer("baz"));
Mono<String> output =
this.decoder.decodeOne(source, ResolvableType.forClass(String.class), null);
TestSubscriber
.subscribe(output)
.assertNoError()
.assertComplete()
.assertValues("foobarbaz");
}
}
......@@ -20,9 +20,9 @@ import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import org.junit.Ignore;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import org.springframework.core.ResolvableType;
......@@ -30,7 +30,6 @@ import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.MediaType;
import org.springframework.http.codec.Pojo;
import org.springframework.http.codec.json.JacksonJsonDecoder;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
......@@ -61,16 +60,15 @@ public class JacksonJsonDecoderTests extends AbstractDataBufferAllocatingTestCas
}
@Test
@Ignore // Issue 109
public void decodeToList() throws Exception {
Flux<DataBuffer> source = Flux.just(stringBuffer(
"[{\"bar\":\"b1\",\"foo\":\"f1\"},{\"bar\":\"b2\",\"foo\":\"f2\"}]"));
Method method = getClass().getDeclaredMethod("handle", List.class);
ResolvableType elementType = ResolvableType.forMethodParameter(method, 0);
Flux<Object> flux = new JacksonJsonDecoder().decode(source, elementType, null);
Mono<Object> mono = new JacksonJsonDecoder().decodeOne(source, elementType, null);
TestSubscriber.subscribe(flux).assertNoError().assertComplete().
TestSubscriber.subscribe(mono).assertNoError().assertComplete().
assertValues(Arrays.asList(new Pojo("f1", "b1"), new Pojo("f2", "b2")));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册