diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/AbstractDecoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/AbstractDecoder.java index 5e925c5ca3dabccca679576f455a2de39e1f4ad4..1b7af181f0524ec9c25dfd4839c43ffac620f6f9 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/AbstractDecoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/AbstractDecoder.java @@ -20,8 +20,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; + import org.springframework.core.ResolvableType; -import org.springframework.core.codec.Decoder; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.util.MimeType; /** @@ -51,4 +54,8 @@ public abstract class AbstractDecoder implements Decoder { anyMatch(mt -> mt.isCompatibleWith(mimeType)); } + @Override + public Mono decodeOne(Publisher inputStream, ResolvableType elementType, MimeType mimeType, Object... hints) { + throw new UnsupportedOperationException(); + } } diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/Decoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/Decoder.java index e3b6804f950f7b74e13607280257c72ec3b8a5db..bd5e611d9af2a73440fc74a52298d7889909e74d 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/Decoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/Decoder.java @@ -20,6 +20,7 @@ import java.util.List; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; @@ -60,6 +61,20 @@ public interface Decoder { Flux decode(Publisher inputStream, ResolvableType elementType, MimeType mimeType, Object... hints); + /** + * Decode a {@link DataBuffer} input stream into a Mono of {@code T}. + * + * @param inputStream the {@code DataBuffer} input stream to decode + * @param elementType the expected type of elements in the output stream; + * this type must have been previously passed to the {@link #canDecode} + * method and it must have returned {@code true}. + * @param mimeType the MIME type associated with the input stream, optional + * @param hints additional information about how to do decode, optional + * @return the output stream with the decoded element + */ + Mono decodeOne(Publisher inputStream, ResolvableType elementType, + MimeType mimeType, Object... hints); + /** * Return the list of MIME types this decoder supports. */ diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/StringDecoder.java index b5936aabca8bdd774a7535c9ecf278c75c01b7f9..7ff0f172e7b88d29c993bf04b361fe686ff69551 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/StringDecoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/StringDecoder.java @@ -25,6 +25,7 @@ import java.util.function.IntPredicate; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; @@ -94,6 +95,21 @@ public class StringDecoder extends AbstractDecoder { }); } + @Override + public Mono decodeOne(Publisher inputStream, ResolvableType elementType, + MimeType mimeType, Object... hints) { + + Charset charset = getCharset(mimeType); + return Flux.from(inputStream) + .map(dataBuffer -> { + CharBuffer charBuffer = charset.decode(dataBuffer.asByteBuffer()); + DataBufferUtils.release(dataBuffer); + return charBuffer.toString(); + }) + .collect(StringBuilder::new, StringBuilder::append) + .map(StringBuilder::toString); + } + private static Flux splitOnNewline(DataBuffer dataBuffer) { List results = new ArrayList(); int startIdx = 0; diff --git a/spring-web-reactive/src/main/java/org/springframework/http/codec/json/JacksonJsonDecoder.java b/spring-web-reactive/src/main/java/org/springframework/http/codec/json/JacksonJsonDecoder.java index 39bae68c966661c989cbfabe49426a6f018d94ae..a68189f72c97f302a49c973f0de524482a472810 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/codec/json/JacksonJsonDecoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/codec/json/JacksonJsonDecoder.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.type.TypeFactory; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.core.codec.CodecException; @@ -46,7 +47,9 @@ public class JacksonJsonDecoder extends AbstractDecoder { private final ObjectMapper mapper; - private final Decoder preProcessor = new JsonObjectDecoder(); + private final Decoder fluxPreProcessor = new JsonObjectDecoder(); + + private final Decoder monoPreProcessor = new JsonObjectDecoder(false); public JacksonJsonDecoder() { @@ -69,7 +72,31 @@ public class JacksonJsonDecoder extends AbstractDecoder { JavaType javaType = typeFactory.constructType(elementType.getType()); ObjectReader reader = this.mapper.readerFor(javaType); - return this.preProcessor.decode(inputStream, elementType, mimeType, hints) + return this.fluxPreProcessor.decode(inputStream, elementType, mimeType, hints) + .map(dataBuffer -> { + try { + Object value = reader.readValue(dataBuffer.asInputStream()); + DataBufferUtils.release(dataBuffer); + return value; + } + catch (IOException e) { + return Flux.error(new CodecException("Error while reading the data", e)); + } + }); + } + + @Override + public Mono decodeOne(Publisher inputStream, ResolvableType elementType, + MimeType mimeType, Object... hints) { + + Assert.notNull(inputStream, "'inputStream' must not be null"); + Assert.notNull(elementType, "'elementType' must not be null"); + TypeFactory typeFactory = this.mapper.getTypeFactory(); + JavaType javaType = typeFactory.constructType(elementType.getType()); + ObjectReader reader = this.mapper.readerFor(javaType); + + return this.monoPreProcessor.decode(inputStream, elementType, mimeType, hints) + .single() .map(dataBuffer -> { try { Object value = reader.readValue(dataBuffer.asInputStream()); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/CodecHttpMessageConverter.java b/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/CodecHttpMessageConverter.java index f61648784e51a3703a8f3f3e84f17455646c2754..eabf0a540758345fa85dc5f3d266147c04177ef1 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/CodecHttpMessageConverter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/CodecHttpMessageConverter.java @@ -125,6 +125,18 @@ public class CodecHttpMessageConverter implements HttpMessageConverter { return this.decoder.decode(inputMessage.getBody(), type, contentType); } + @Override + public Mono readOne(ResolvableType type, ReactiveHttpInputMessage inputMessage) { + if (this.decoder == null) { + return Mono.error(new IllegalStateException("No decoder set")); + } + MediaType contentType = inputMessage.getHeaders().getContentType(); + if (contentType == null) { + contentType = MediaType.APPLICATION_OCTET_STREAM; + } + return this.decoder.decodeOne(inputMessage.getBody(), type, contentType); + } + @Override public Mono write(Publisher inputStream, ResolvableType type, MediaType contentType, ReactiveHttpOutputMessage outputMessage) { diff --git a/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/HttpMessageConverter.java b/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/HttpMessageConverter.java index ce001c5d086b359ac1337e66044c5058a891652f..0f663457fa6194c98aa37ddad96661fbac8fdb94 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/HttpMessageConverter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/HttpMessageConverter.java @@ -50,16 +50,27 @@ public interface HttpMessageConverter { List getReadableMediaTypes(); /** - * Read an object of the given type form the given input message, and returns it. + * Read a {@link Flux} of the given type form the given input message, and returns it. * @param type the type of object to return. This type must have previously been * passed to the * {@link #canRead canRead} method of this interface, which must have returned {@code * true}. * @param inputMessage the HTTP input message to read from - * @return the converted object + * @return the converted {@link Flux} of elements */ Flux read(ResolvableType type, ReactiveHttpInputMessage inputMessage); + /** + * Read a {@link Mono} of the given type form the given input message, and returns it. + * @param type the type of object to return. This type must have previously been + * passed to the + * {@link #canRead canRead} method of this interface, which must have returned {@code + * true}. + * @param inputMessage the HTTP input message to read from + * @return the converted {@link Mono} of object + */ + Mono readOne(ResolvableType type, ReactiveHttpInputMessage inputMessage); + /** * Indicates whether the given class can be written by this converter. * @param type the class to test for writability diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/method/annotation/RequestBodyArgumentResolver.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/method/annotation/RequestBodyArgumentResolver.java index b58123dadd1114fddb371dfbc65fb9f0407fd68f..3f72586ea8a95b97f75e50d1ac20fd0bc4d6fe43 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/method/annotation/RequestBodyArgumentResolver.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/method/annotation/RequestBodyArgumentResolver.java @@ -23,6 +23,7 @@ import java.util.stream.Collectors; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import rx.Observable; import org.springframework.core.Conventions; import org.springframework.core.MethodParameter; @@ -122,7 +123,8 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve ResolvableType type = ResolvableType.forMethodParameter(parameter); boolean isAsyncType = isAsyncType(type); - ResolvableType elementType = (isAsyncType ? type.getGeneric(0) : type); + boolean isStreamableType = isStreamableType(type); + ResolvableType elementType = (isStreamableType || isAsyncType ? type.getGeneric(0) : type); MediaType mediaType = exchange.getRequest().getHeaders().getContentType(); if (mediaType == null) { @@ -131,23 +133,33 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve for (HttpMessageConverter converter : getMessageConverters()) { if (converter.canRead(elementType, mediaType)) { - Flux elementFlux = converter.read(elementType, exchange.getRequest()); - if (this.validator != null) { - elementFlux= applyValidationIfApplicable(elementFlux, parameter); - } - - if (Mono.class.equals(type.getRawClass())) { - return Mono.just(Mono.from(elementFlux)); - } - else if (Flux.class.equals(type.getRawClass())) { - return Mono.just(elementFlux); - } - else if (isAsyncType) { - return Mono.just(getConversionService().convert(elementFlux, type.getRawClass())); + if (isStreamableType) { + Publisher elements = converter.read(elementType, exchange.getRequest()); + if (this.validator != null) { + elements= applyValidationIfApplicable(elements, parameter); + } + if (Flux.class.equals(type.getRawClass())) { + return Mono.just(elements); + } + else if (isAsyncType && this.conversionService.canConvert(Flux.class, type.getRawClass())) { + return Mono.just(this.conversionService.convert(elements, type.getRawClass())); + } } else { - return elementFlux.next().map(o -> o); + Mono element = converter.readOne(elementType, exchange.getRequest()); + if (this.validator != null) { + element = Mono.from(applyValidationIfApplicable(element, parameter)); + } + if (Mono.class.equals(type.getRawClass())) { + return Mono.just(element); + } + else if (isAsyncType && this.conversionService.canConvert(Mono.class, type.getRawClass())) { + return Mono.just(this.conversionService.convert(element, type.getRawClass())); + } + else { + return (Mono)element; + } } } } @@ -157,23 +169,28 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve private boolean isAsyncType(ResolvableType type) { return (Mono.class.equals(type.getRawClass()) || Flux.class.equals(type.getRawClass()) || - getConversionService().canConvert(Publisher.class, type.getRawClass())); + getConversionService().canConvert(Mono.class, type.getRawClass()) || + getConversionService().canConvert(Flux.class, type.getRawClass())); + } + + private boolean isStreamableType(ResolvableType type) { + return this.conversionService.canConvert(Flux.class, type.getRawClass()); } - protected Flux applyValidationIfApplicable(Flux elementFlux, MethodParameter methodParam) { + protected Publisher applyValidationIfApplicable(Publisher elements, MethodParameter methodParam) { Annotation[] annotations = methodParam.getParameterAnnotations(); for (Annotation ann : annotations) { Validated validAnnot = AnnotationUtils.getAnnotation(ann, Validated.class); if (validAnnot != null || ann.annotationType().getSimpleName().startsWith("Valid")) { Object hints = (validAnnot != null ? validAnnot.value() : AnnotationUtils.getValue(ann)); Object[] validationHints = (hints instanceof Object[] ? (Object[]) hints : new Object[] {hints}); - return elementFlux.map(element -> { + return Flux.from(elements).map(element -> { validate(element, validationHints, methodParam); return element; }); } } - return elementFlux; + return elements; } /** diff --git a/spring-web-reactive/src/test/java/org/springframework/core/codec/StringDecoderTests.java b/spring-web-reactive/src/test/java/org/springframework/core/codec/StringDecoderTests.java index 505dca462ff51ccf3eda126afc0f4c5aea2f5526..adce87826a747bcb8a87c35cd4a48c6193455843 100644 --- a/spring-web-reactive/src/test/java/org/springframework/core/codec/StringDecoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/core/codec/StringDecoderTests.java @@ -19,6 +19,7 @@ package org.springframework.core.codec; import org.junit.Before; import org.junit.Test; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.test.TestSubscriber; import org.springframework.core.ResolvableType; @@ -96,4 +97,18 @@ public class StringDecoderTests extends AbstractDataBufferAllocatingTestCase { .assertValues(""); } + @Test + public void decodeOne() throws InterruptedException { + this.decoder = new StringDecoder(false); + Flux source = + Flux.just(stringBuffer("foo"), stringBuffer("bar"), stringBuffer("baz")); + Mono output = + this.decoder.decodeOne(source, ResolvableType.forClass(String.class), null); + TestSubscriber + .subscribe(output) + .assertNoError() + .assertComplete() + .assertValues("foobarbaz"); + } + } diff --git a/spring-web-reactive/src/test/java/org/springframework/http/codec/json/JacksonJsonDecoderTests.java b/spring-web-reactive/src/test/java/org/springframework/http/codec/json/JacksonJsonDecoderTests.java index 54f99e66e7a5a1348a8b8059db4bf0b9241c3c9a..0321bd234708a83b10026b7b118a037009b97163 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/codec/json/JacksonJsonDecoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/codec/json/JacksonJsonDecoderTests.java @@ -20,9 +20,9 @@ import java.lang.reflect.Method; import java.util.Arrays; import java.util.List; -import org.junit.Ignore; import org.junit.Test; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.test.TestSubscriber; import org.springframework.core.ResolvableType; @@ -30,7 +30,6 @@ import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.MediaType; import org.springframework.http.codec.Pojo; -import org.springframework.http.codec.json.JacksonJsonDecoder; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -61,16 +60,15 @@ public class JacksonJsonDecoderTests extends AbstractDataBufferAllocatingTestCas } @Test - @Ignore // Issue 109 public void decodeToList() throws Exception { Flux source = Flux.just(stringBuffer( "[{\"bar\":\"b1\",\"foo\":\"f1\"},{\"bar\":\"b2\",\"foo\":\"f2\"}]")); Method method = getClass().getDeclaredMethod("handle", List.class); ResolvableType elementType = ResolvableType.forMethodParameter(method, 0); - Flux flux = new JacksonJsonDecoder().decode(source, elementType, null); + Mono mono = new JacksonJsonDecoder().decodeOne(source, elementType, null); - TestSubscriber.subscribe(flux).assertNoError().assertComplete(). + TestSubscriber.subscribe(mono).assertNoError().assertComplete(). assertValues(Arrays.asList(new Pojo("f1", "b1"), new Pojo("f2", "b2"))); }