From f4d8c7cc2b6e3add1b1b06dc487cce82dceb4a49 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 14 Dec 2017 16:59:34 -0500 Subject: [PATCH] Improve decodeToMono support This commit ensures uniform support for decodeToMono across the various byte and String related decoders. Issue: SPR-16253 --- .../core/codec/AbstractDataBufferDecoder.java | 76 +++++++++++++++++++ .../core/codec/ByteArrayDecoder.java | 20 ++--- .../core/codec/ByteBufferDecoder.java | 21 +++-- .../core/codec/DataBufferDecoder.java | 11 ++- .../core/codec/ResourceDecoder.java | 27 +++---- .../core/codec/StringDecoder.java | 19 ++--- .../core/codec/ByteArrayDecoderTests.java | 19 ++++- .../core/codec/ByteBufferDecoderTests.java | 16 ++++ .../core/codec/DataBufferDecoderTests.java | 22 +++++- 9 files changed, 174 insertions(+), 57 deletions(-) create mode 100644 spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java diff --git a/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java new file mode 100644 index 0000000000..d5bdf0feed --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java @@ -0,0 +1,76 @@ +/* + * Copyright 2002-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import java.util.Map; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.lang.Nullable; +import org.springframework.util.MimeType; + +/** + * Abstract base class for {@code Decoder} implementations that can decode + * a {@code DataBuffer} directly to the target element type. + * + *

Sub-classes must implement {@link #decodeDataBuffer} to provide a way to + * transform a {@code DataBuffer} to the target data type. The default + * {@link #decode} implementation transforms each individual data buffer while + * {@link #decodeToMono} applies "reduce" and transforms the aggregated buffer. + * + *

Sub-classes can override {@link #decode} in order to split the input stream + * along different boundaries (e.g. on new line characters for {@code String}) + * or always reduce to a single data buffer (e.g. {@code Resource}). + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public abstract class AbstractDataBufferDecoder extends AbstractDecoder { + + + protected AbstractDataBufferDecoder(MimeType... supportedMimeTypes) { + super(supportedMimeTypes); + } + + + @Override + public Flux decode(Publisher inputStream, ResolvableType elementType, + @Nullable MimeType mimeType, @Nullable Map hints) { + + return Flux.from(inputStream).map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints)); + } + + @Override + public Mono decodeToMono(Publisher inputStream, ResolvableType elementType, + @Nullable MimeType mimeType, @Nullable Map hints) { + + return Flux.from(inputStream) + .reduce(DataBuffer::write) + .map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints)); + } + + /** + * How to decode a {@code DataBuffer} to the target element type. + */ + protected abstract T decodeDataBuffer(DataBuffer buffer, ResolvableType elementType, + @Nullable MimeType mimeType, @Nullable Map hints); + +} diff --git a/spring-core/src/main/java/org/springframework/core/codec/ByteArrayDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/ByteArrayDecoder.java index d2587a8385..cf8a34c105 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/ByteArrayDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/ByteArrayDecoder.java @@ -18,9 +18,6 @@ package org.springframework.core.codec; import java.util.Map; -import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; - import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; @@ -32,9 +29,11 @@ import org.springframework.util.MimeTypeUtils; * Decoder for {@code byte} arrays. * * @author Arjen Poutsma + * @author Rossen Stoyanchev * @since 5.0 */ -public class ByteArrayDecoder extends AbstractDecoder { +public class ByteArrayDecoder extends AbstractDataBufferDecoder { + public ByteArrayDecoder() { super(MimeTypeUtils.ALL); @@ -48,16 +47,13 @@ public class ByteArrayDecoder extends AbstractDecoder { } @Override - public Flux decode(Publisher inputStream, ResolvableType elementType, + protected byte[] decodeDataBuffer(DataBuffer dataBuffer, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - return Flux.from(inputStream).map((dataBuffer) -> { - byte[] result = new byte[dataBuffer.readableByteCount()]; - dataBuffer.read(result); - DataBufferUtils.release(dataBuffer); - return result ; - }); + byte[] result = new byte[dataBuffer.readableByteCount()]; + dataBuffer.read(result); + DataBufferUtils.release(dataBuffer); + return result; } - } diff --git a/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java index 391a2fcdc6..df4a642f54 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java @@ -19,9 +19,6 @@ package org.springframework.core.codec; import java.nio.ByteBuffer; import java.util.Map; -import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; - import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; @@ -34,9 +31,11 @@ import org.springframework.util.MimeTypeUtils; * * @author Sebastien Deleuze * @author Arjen Poutsma + * @author Rossen Stoyanchev * @since 5.0 */ -public class ByteBufferDecoder extends AbstractDecoder { +public class ByteBufferDecoder extends AbstractDataBufferDecoder { + public ByteBufferDecoder() { super(MimeTypeUtils.ALL); @@ -50,16 +49,14 @@ public class ByteBufferDecoder extends AbstractDecoder { } @Override - public Flux decode(Publisher inputStream, ResolvableType elementType, + protected ByteBuffer decodeDataBuffer(DataBuffer dataBuffer, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - return Flux.from(inputStream).map((dataBuffer) -> { - ByteBuffer copy = ByteBuffer.allocate(dataBuffer.readableByteCount()); - copy.put(dataBuffer.asByteBuffer()); - copy.flip(); - DataBufferUtils.release(dataBuffer); - return copy; - }); + ByteBuffer copy = ByteBuffer.allocate(dataBuffer.readableByteCount()); + copy.put(dataBuffer.asByteBuffer()); + copy.flip(); + DataBufferUtils.release(dataBuffer); + return copy; } } diff --git a/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java index 02fa331691..25e2d5f482 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java @@ -34,9 +34,11 @@ import org.springframework.util.MimeTypeUtils; * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer)}. * * @author Arjen Poutsma + * @author Rossen Stoyanchev * @since 5.0 */ -public class DataBufferDecoder extends AbstractDecoder { +public class DataBufferDecoder extends AbstractDataBufferDecoder { + public DataBufferDecoder() { super(MimeTypeUtils.ALL); @@ -56,4 +58,11 @@ public class DataBufferDecoder extends AbstractDecoder { return Flux.from(inputStream); } + @Override + protected DataBuffer decodeDataBuffer(DataBuffer buffer, ResolvableType elementType, + @Nullable MimeType mimeType, @Nullable Map hints) { + + return buffer; + } + } diff --git a/spring-core/src/main/java/org/springframework/core/codec/ResourceDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/ResourceDecoder.java index 75b49c6f9a..e46839085a 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/ResourceDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/ResourceDecoder.java @@ -21,7 +21,6 @@ import java.util.Map; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.core.io.ByteArrayResource; @@ -38,9 +37,11 @@ import org.springframework.util.MimeTypeUtils; * Decoder for {@link Resource}s. * * @author Arjen Poutsma + * @author Rossen Stoyanchev * @since 5.0 */ -public class ResourceDecoder extends AbstractDecoder { +public class ResourceDecoder extends AbstractDataBufferDecoder { + public ResourceDecoder() { super(MimeTypeUtils.ALL); @@ -63,30 +64,24 @@ public class ResourceDecoder extends AbstractDecoder { } @Override - public Mono decodeToMono(Publisher inputStream, ResolvableType elementType, + protected Resource decodeDataBuffer(DataBuffer dataBuffer, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { + byte[] bytes = new byte[dataBuffer.readableByteCount()]; + dataBuffer.read(bytes); + DataBufferUtils.release(dataBuffer); + Class clazz = elementType.getRawClass(); Assert.state(clazz != null, "No resource class"); - Mono byteArray = Flux.from(inputStream). - reduce(DataBuffer::write). - map(dataBuffer -> { - byte[] bytes = new byte[dataBuffer.readableByteCount()]; - dataBuffer.read(bytes); - DataBufferUtils.release(dataBuffer); - return bytes; - }); - - if (InputStreamResource.class == clazz) { - return Mono.from(byteArray.map(ByteArrayInputStream::new).map(InputStreamResource::new)); + return new InputStreamResource(new ByteArrayInputStream(bytes)); } else if (clazz.isAssignableFrom(ByteArrayResource.class)) { - return Mono.from(byteArray.map(ByteArrayResource::new)); + return new ByteArrayResource(bytes); } else { - return Mono.error(new IllegalStateException("Unsupported resource class: " + clazz)); + throw new IllegalStateException("Unsupported resource class: " + clazz); } } diff --git a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java index 555430e77d..fc403bebd9 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java @@ -26,7 +26,6 @@ import java.util.function.IntPredicate; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; @@ -49,7 +48,7 @@ import org.springframework.util.MimeTypeUtils; * @since 5.0 * @see CharSequenceEncoder */ -public class StringDecoder extends AbstractDecoder { +public class StringDecoder extends AbstractDataBufferDecoder { public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; @@ -84,16 +83,7 @@ public class StringDecoder extends AbstractDecoder { if (this.splitOnNewline) { inputFlux = Flux.from(inputStream).flatMap(StringDecoder::splitOnNewline); } - return inputFlux.map(buffer -> decodeDataBuffer(buffer, mimeType)); - } - - @Override - public Mono decodeToMono(Publisher inputStream, ResolvableType elementType, - @Nullable MimeType mimeType, @Nullable Map hints) { - - return Flux.from(inputStream) - .reduce(DataBuffer::write) - .map(buffer -> decodeDataBuffer(buffer, mimeType)); + return super.decode(inputFlux, elementType, mimeType, hints); } private static Flux splitOnNewline(DataBuffer dataBuffer) { @@ -113,7 +103,10 @@ public class StringDecoder extends AbstractDecoder { return Flux.fromIterable(results); } - private String decodeDataBuffer(DataBuffer dataBuffer, @Nullable MimeType mimeType) { + @Override + protected String decodeDataBuffer(DataBuffer dataBuffer, ResolvableType elementType, + @Nullable MimeType mimeType, @Nullable Map hints) { + Charset charset = getCharset(mimeType); CharBuffer charBuffer = charset.decode(dataBuffer.asByteBuffer()); DataBufferUtils.release(dataBuffer); diff --git a/spring-core/src/test/java/org/springframework/core/codec/ByteArrayDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/ByteArrayDecoderTests.java index 7f6aed0aa7..2635ea2dbc 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/ByteArrayDecoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/ByteArrayDecoderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import java.util.Collections; import org.junit.Test; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import org.springframework.core.ResolvableType; @@ -39,6 +40,7 @@ public class ByteArrayDecoderTests extends AbstractDataBufferAllocatingTestCase private final ByteArrayDecoder decoder = new ByteArrayDecoder(); + @Test public void canDecode() { assertTrue(this.decoder.canDecode(ResolvableType.forClass(byte[].class), @@ -65,4 +67,19 @@ public class ByteArrayDecoderTests extends AbstractDataBufferAllocatingTestCase .verify(); } + @Test + public void decodeToMono() { + DataBuffer fooBuffer = stringBuffer("foo"); + DataBuffer barBuffer = stringBuffer("bar"); + Flux source = Flux.just(fooBuffer, barBuffer); + Mono output = this.decoder.decodeToMono(source, + ResolvableType.forClassWithGenerics(Publisher.class, byte[].class), + null, Collections.emptyMap()); + + StepVerifier.create(output) + .consumeNextWith(bytes -> assertArrayEquals("foobar".getBytes(), bytes)) + .expectComplete() + .verify(); + } + } \ No newline at end of file diff --git a/spring-core/src/test/java/org/springframework/core/codec/ByteBufferDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/ByteBufferDecoderTests.java index 34e9803716..4068822e6a 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/ByteBufferDecoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/ByteBufferDecoderTests.java @@ -22,6 +22,7 @@ import java.util.Collections; import org.junit.Test; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import org.springframework.core.ResolvableType; @@ -63,4 +64,19 @@ public class ByteBufferDecoderTests extends AbstractDataBufferAllocatingTestCase .expectComplete() .verify(); } + + @Test + public void decodeToMono() { + DataBuffer fooBuffer = stringBuffer("foo"); + DataBuffer barBuffer = stringBuffer("bar"); + Flux source = Flux.just(fooBuffer, barBuffer); + Mono output = this.decoder.decodeToMono(source, + ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class), + null, Collections.emptyMap()); + + StepVerifier.create(output) + .expectNext(ByteBuffer.wrap("foobar".getBytes())) + .expectComplete() + .verify(); + } } diff --git a/spring-core/src/test/java/org/springframework/core/codec/DataBufferDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/DataBufferDecoderTests.java index 704db7135f..b6c277f399 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/DataBufferDecoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/DataBufferDecoderTests.java @@ -17,15 +17,19 @@ package org.springframework.core.codec; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Collections; import org.junit.Test; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.support.DataBufferTestUtils; import org.springframework.util.MimeTypeUtils; import static org.junit.Assert.*; @@ -53,11 +57,25 @@ public class DataBufferDecoderTests extends AbstractDataBufferAllocatingTestCase DataBuffer barBuffer = stringBuffer("bar"); Flux source = Flux.just(fooBuffer, barBuffer); Flux output = this.decoder.decode(source, - ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class), + ResolvableType.forClassWithGenerics(Publisher.class, DataBuffer.class), null, Collections.emptyMap()); assertSame(source, output); release(fooBuffer, barBuffer); } -} + + @Test + public void decodeToMono() { + DataBuffer fooBuffer = stringBuffer("foo"); + DataBuffer barBuffer = stringBuffer("bar"); + Flux source = Flux.just(fooBuffer, barBuffer); + Mono output = this.decoder.decodeToMono(source, + ResolvableType.forClassWithGenerics(Publisher.class, DataBuffer.class), + null, Collections.emptyMap()); + + DataBuffer outputBuffer = output.block(Duration.ofSeconds(5)); + assertEquals("foobar", DataBufferTestUtils.dumpString(outputBuffer, StandardCharsets.UTF_8)); + + release(outputBuffer); + }} -- GitLab