提交 f7c6c69e 编写于 作者: A Arjen Poutsma

Improve JacksonJsonEncoder

This commit removes the need for the custom Subscriber in
JsonObjectEncoder, and replaces it with higher-level Flux and Mono-based
solution.
上级 7f786ce4
......@@ -27,54 +27,59 @@ import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.CodecException;
import org.springframework.core.codec.Encoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
/**
* Encode from an {@code Object} stream to a byte stream of JSON objects.
*
* @author Sebastien Deleuze
* @author Arjen Poutsma
* @see JacksonJsonDecoder
*/
public class JacksonJsonEncoder extends AbstractEncoder<Object> {
private final ObjectMapper mapper;
private Encoder<DataBuffer> postProcessor;
public JacksonJsonEncoder() {
this(new ObjectMapper(), null);
}
public JacksonJsonEncoder(Encoder<DataBuffer> postProcessor) {
this(new ObjectMapper(), postProcessor);
this(new ObjectMapper());
}
public JacksonJsonEncoder(ObjectMapper mapper,
Encoder<DataBuffer> postProcessor) {
public JacksonJsonEncoder(ObjectMapper mapper) {
super(new MimeType("application", "json", StandardCharsets.UTF_8),
new MimeType("application", "*+json", StandardCharsets.UTF_8));
Assert.notNull(mapper, "'mapper' must not be null");
this.mapper = mapper;
this.postProcessor = postProcessor;
}
@Override
public Flux<DataBuffer> encode(Publisher<?> inputStream,
DataBufferAllocator allocator, ResolvableType type, MimeType mimeType,
Object... hints) {
if (inputStream instanceof Mono) {
// single object
return Flux.from(inputStream).map(value -> serialize(value, allocator));
}
else {
// array
Mono<DataBuffer> startArray = Mono.just(charBuffer('[', allocator));
Flux<DataBuffer> arraySeparators =
Flux.create(sub -> sub.onNext(charBuffer(',', allocator)));
Mono<DataBuffer> endArray = Mono.just(charBuffer(']', allocator));
Flux<DataBuffer> serializedObjects =
Flux.from(inputStream).map(value -> serialize(value, allocator));
Publisher<DataBuffer> stream = (inputStream instanceof Mono ?
((Mono<?>) inputStream).map(value -> serialize(value, allocator)) :
Flux.from(inputStream).map(value -> serialize(value, allocator)));
// TODO: figure out why using the parameter allocator for the postprocessor
// commits the response too early
DefaultDataBufferAllocator tempAllocator = new DefaultDataBufferAllocator();
Flux<DataBuffer> array = Flux.zip(serializedObjects, arraySeparators)
.flatMap(tuple -> Flux.just(tuple.getT1(), tuple.getT2()));
return (this.postProcessor == null ? Flux.from(stream) :
this.postProcessor.encode(stream, tempAllocator, type, mimeType, hints));
Flux<DataBuffer> arrayWithoutLastSeparator = Flux.from(array).skipLast(1);
return Flux.concat(startArray, arrayWithoutLastSeparator, endArray);
}
}
private DataBuffer serialize(Object value, DataBufferAllocator allocator) {
......@@ -89,4 +94,11 @@ public class JacksonJsonEncoder extends AbstractEncoder<Object> {
return buffer;
}
private DataBuffer charBuffer(char ch, DataBufferAllocator allocator) {
DataBuffer buffer = allocator.allocateBuffer(1);
buffer.write((byte) ch);
return buffer;
}
}
......@@ -43,7 +43,6 @@ import org.springframework.util.MimeType;
* Based on <a href="https://github.com/netty/netty/blob/master/codec/src/main/java/io/netty/handler/codec/json/JsonObjectDecoder.java">Netty JsonObjectDecoder</a>
*
* @author Sebastien Deleuze
* @see JsonObjectEncoder
*/
public class JsonObjectDecoder extends AbstractDecoder<DataBuffer> {
......@@ -74,7 +73,6 @@ public class JsonObjectDecoder extends AbstractDecoder<DataBuffer> {
/**
* @param allocator
* @param maxObjectLength maximum number of bytes a JSON object/array may
* use (including braces and all). Objects exceeding this length are dropped
* and an {@link IllegalStateException} is thrown.
......
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.codec.support;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.subscriber.SubscriberBarrier;
import reactor.core.util.BackpressureUtils;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.util.MimeType;
/**
* Encode a byte stream of individual JSON element to a byte stream representing:
* - the same JSON object than the input stream if it is a {@link Mono}
* - a JSON array for other kinds of {@link Publisher}
*
* @author Sebastien Deleuze
* @author Stephane Maldini
*
* @see JsonObjectDecoder
*/
public class JsonObjectEncoder extends AbstractEncoder<DataBuffer> {
public JsonObjectEncoder() {
super(new MimeType("application", "json", StandardCharsets.UTF_8),
new MimeType("application", "*+json", StandardCharsets.UTF_8));
}
@Override
public Flux<DataBuffer> encode(Publisher<? extends DataBuffer> inputStream,
DataBufferAllocator allocator,
ResolvableType type, MimeType mimeType, Object... hints) {
if (inputStream instanceof Mono) {
return Flux.from(inputStream);
}
return Flux.from(inputStream)
.lift(s -> new JsonArrayEncoderBarrier(s, allocator));
}
private static class JsonArrayEncoderBarrier
extends SubscriberBarrier<DataBuffer, DataBuffer> {
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<JsonArrayEncoderBarrier> REQUESTED =
AtomicLongFieldUpdater.newUpdater(JsonArrayEncoderBarrier.class, "requested");
static final AtomicIntegerFieldUpdater<JsonArrayEncoderBarrier> TERMINATED =
AtomicIntegerFieldUpdater.newUpdater(JsonArrayEncoderBarrier.class, "terminated");
private final DataBufferAllocator allocator;
private DataBuffer prev = null;
private long count = 0;
private volatile long requested;
private volatile int terminated;
public JsonArrayEncoderBarrier(Subscriber<? super DataBuffer> subscriber,
DataBufferAllocator allocator) {
super(subscriber);
this.allocator = allocator;
}
@Override
protected void doRequest(long n) {
BackpressureUtils.getAndAdd(REQUESTED, this, n);
if(TERMINATED.compareAndSet(this, 1, 2)){
drainLast();
}
else {
super.doRequest(n);
}
}
@Override
protected void doNext(DataBuffer next) {
this.count++;
DataBuffer tmp = this.prev;
this.prev = next;
DataBuffer buffer = allocator.allocateBuffer();
if (this.count == 1) {
buffer.write((byte) '[');
}
if (tmp != null) {
buffer.write(tmp);
}
if (this.count > 1) {
buffer.write((byte) ',');
}
BackpressureUtils.getAndSub(REQUESTED, this, 1L);
subscriber.onNext(buffer);
}
protected void drainLast(){
if(BackpressureUtils.getAndSub(REQUESTED, this, 1L) > 0) {
DataBuffer buffer = allocator.allocateBuffer();
buffer.write(this.prev);
buffer.write((byte) ']');
subscriber.onNext(buffer);
super.doComplete();
}
}
@Override
protected void doComplete() {
if(TERMINATED.compareAndSet(this, 0, 1)) {
drainLast();
}
}
}
}
......@@ -27,6 +27,7 @@ import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Encoder;
......@@ -55,7 +56,7 @@ public class DefaultHttpRequestBuilder implements HttpRequestBuilder {
protected URI url;
protected Flux contentPublisher;
protected Publisher contentPublisher;
protected List<Encoder<?>> messageEncoders;
......@@ -127,7 +128,7 @@ public class DefaultHttpRequestBuilder implements HttpRequestBuilder {
}
public DefaultHttpRequestBuilder content(Object content) {
this.contentPublisher = Flux.just(content);
this.contentPublisher = Mono.just(content);
return this;
}
......
......@@ -56,8 +56,8 @@ public class JacksonJsonEncoderTests extends AbstractAllocatingTestCase {
});
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
testSubscriber.bindTo(output)
.assertValues("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}",
"{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}");
.assertValues("[", "{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", ",",
"{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}", "]");
}
}
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.codec.support;
import java.nio.charset.StandardCharsets;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import org.springframework.core.io.buffer.DataBuffer;
/**
* @author Sebastien Deleuze
*/
public class JsonObjectEncoderTests extends AbstractAllocatingTestCase {
private JsonObjectEncoder encoder;
@Before
public void createEncoder() {
encoder = new JsonObjectEncoder();
}
@Test
public void encodeSingleElementFlux() throws InterruptedException {
Flux<DataBuffer> source =
Flux.just(stringBuffer("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"));
Flux<String> output =
Flux.from(encoder.encode(source, allocator, null, null)).map(chunk -> {
byte[] b = new byte[chunk.readableByteCount()];
chunk.read(b);
return new String(b, StandardCharsets.UTF_8);
});
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
testSubscriber.bindTo(output)
.assertValues("[", "{\"foo\": \"foofoo\", \"bar\": \"barbar\"}]");
}
@Test
public void encodeSingleElementMono() throws InterruptedException {
Mono<DataBuffer> source =
Mono.just(stringBuffer("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"));
Flux<String> output =
Flux.from(encoder.encode(source, allocator, null, null)).map(chunk -> {
byte[] b = new byte[chunk.readableByteCount()];
chunk.read(b);
return new String(b, StandardCharsets.UTF_8);
});
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
testSubscriber.bindTo(output)
.assertValues("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}");
}
@Test
public void encodeTwoElementsFlux() throws InterruptedException {
Flux<DataBuffer> source =
Flux.just(stringBuffer("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"),
stringBuffer("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}"));
Flux<String> output =
Flux.from(encoder.encode(source, allocator, null, null)).map(chunk -> {
byte[] b = new byte[chunk.readableByteCount()];
chunk.read(b);
return new String(b, StandardCharsets.UTF_8);
});
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
testSubscriber.bindTo(output)
.assertValues("[",
"{\"foo\": \"foofoo\", \"bar\": \"barbar\"},",
"{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]");
}
@Test
public void encodeThreeElementsFlux() throws InterruptedException {
Flux<DataBuffer> source =
Flux.just(stringBuffer("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}"),
stringBuffer("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}"),
stringBuffer("{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}")
);
Flux<String> output =
Flux.from(encoder.encode(source, allocator, null, null)).map(chunk -> {
byte[] b = new byte[chunk.readableByteCount()];
chunk.read(b);
return new String(b, StandardCharsets.UTF_8);
});
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
testSubscriber.bindTo(output)
.assertValues("[",
"{\"foo\": \"foofoo\", \"bar\": \"barbar\"},",
"{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"},",
"{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}]");
}
}
......@@ -40,7 +40,6 @@ import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.support.ByteBufferEncoder;
import org.springframework.core.codec.support.JacksonJsonEncoder;
import org.springframework.core.codec.support.JsonObjectEncoder;
import org.springframework.core.codec.support.StringEncoder;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.support.GenericConversionService;
......@@ -380,7 +379,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
@Bean
public ResponseBodyResultHandler responseBodyResultHandler() {
List<Encoder<?>> encoders = Arrays.asList(new ByteBufferEncoder(),
new StringEncoder(), new JacksonJsonEncoder(new JsonObjectEncoder()));
new StringEncoder(), new JacksonJsonEncoder());
ResponseBodyResultHandler resultHandler = new ResponseBodyResultHandler(encoders, conversionService());
resultHandler.setOrder(1);
return resultHandler;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册