提交 12d7b781 编写于 作者: S Sebastien Deleuze 提交者: Rossen Stoyanchev

Refactor reactive type conversion support

This commit replaces Reactive Streams converters for RxJava1 and
CompletableFuture with Reactor specific ones. The results in conversion
that preserves stream semantics, i.e. Mono vs Flux.

For example this is allowed:
Flux -> Observable
Mono -> Single
Mono -> CompletableFuture

This is not allowed:
Flux -> Single
Mono -> Observable
Flux -> CompletableFuture

As a result it is now possible to check through the ConversionService
if a target type to convert to is a stream of many or of one which is
useful for decoding purposes.

The commit also adds PublisherToFluxConverter to allow conversion from
raw Publisher to Flux. The reverse is not necessary since Flux is a
Publisher and it's a no-op conversion.
上级 95239519
......@@ -29,12 +29,12 @@ import org.springframework.core.convert.converter.GenericConverter;
/**
* @author Sebastien Deleuze
*/
public class ReactiveStreamsToCompletableFutureConverter implements GenericConverter {
public class MonoToCompletableFutureConverter implements GenericConverter {
@Override
public Set<ConvertiblePair> getConvertibleTypes() {
Set<GenericConverter.ConvertiblePair> pairs = new LinkedHashSet<>();
pairs.add(new GenericConverter.ConvertiblePair(Publisher.class, CompletableFuture.class));
pairs.add(new GenericConverter.ConvertiblePair(Mono.class, CompletableFuture.class));
pairs.add(new GenericConverter.ConvertiblePair(CompletableFuture.class, Publisher.class));
return pairs;
}
......@@ -44,10 +44,10 @@ public class ReactiveStreamsToCompletableFutureConverter implements GenericConve
if (source == null) {
return null;
}
else if (CompletableFuture.class.isAssignableFrom(source.getClass())) {
else if (CompletableFuture.class.isAssignableFrom(sourceType.getType())) {
return Mono.fromFuture((CompletableFuture) source);
}
else if (CompletableFuture.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
else if (CompletableFuture.class.isAssignableFrom(targetType.getType())) {
return Mono.from((Publisher) source).toFuture();
}
return null;
......
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.convert.support;
import java.util.LinkedHashSet;
import java.util.Set;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.core.convert.TypeDescriptor;
import org.springframework.core.convert.converter.GenericConverter;
/**
* @author Sebastien Deleuze
*/
public class PublisherToFluxConverter implements GenericConverter {
@Override
public Set<ConvertiblePair> getConvertibleTypes() {
Set<ConvertiblePair> pairs = new LinkedHashSet<>();
pairs.add(new ConvertiblePair(Publisher.class, Flux.class));
return pairs;
}
@Override
public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {
if (source == null) {
return null;
}
else if (Publisher.class.isAssignableFrom(sourceType.getType())) {
return Flux.from((Publisher)source);
}
return null;
}
}
......@@ -22,6 +22,8 @@ import java.util.Set;
import org.reactivestreams.Publisher;
import reactor.core.converter.RxJava1ObservableConverter;
import reactor.core.converter.RxJava1SingleConverter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.Single;
......@@ -29,18 +31,17 @@ import org.springframework.core.convert.TypeDescriptor;
import org.springframework.core.convert.converter.GenericConverter;
/**
* TODO Avoid classpath exception for older RxJava1 version without Single type
* @author Stephane Maldini
* @author Sebastien Deleuze
*/
public final class ReactiveStreamsToRxJava1Converter implements GenericConverter {
public final class ReactorToRxJava1Converter implements GenericConverter {
@Override
public Set<GenericConverter.ConvertiblePair> getConvertibleTypes() {
Set<GenericConverter.ConvertiblePair> pairs = new LinkedHashSet<>();
pairs.add(new GenericConverter.ConvertiblePair(Publisher.class, Observable.class));
pairs.add(new GenericConverter.ConvertiblePair(Flux.class, Observable.class));
pairs.add(new GenericConverter.ConvertiblePair(Observable.class, Publisher.class));
pairs.add(new GenericConverter.ConvertiblePair(Publisher.class, Single.class));
pairs.add(new GenericConverter.ConvertiblePair(Mono.class, Single.class));
pairs.add(new GenericConverter.ConvertiblePair(Single.class, Publisher.class));
return pairs;
}
......@@ -50,16 +51,16 @@ public final class ReactiveStreamsToRxJava1Converter implements GenericConverter
if (source == null) {
return null;
}
if (Observable.class.isAssignableFrom(source.getClass())) {
if (Observable.class.isAssignableFrom(sourceType.getType())) {
return RxJava1ObservableConverter.from((Observable) source);
}
else if (Observable.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
else if (Observable.class.isAssignableFrom(targetType.getType())) {
return RxJava1ObservableConverter.from((Publisher) source);
}
else if (Single.class.isAssignableFrom(source.getClass())) {
else if (Single.class.isAssignableFrom(sourceType.getType())) {
return RxJava1SingleConverter.from((Single) source);
}
else if (Single.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) {
else if (Single.class.isAssignableFrom(targetType.getType())) {
return RxJava1SingleConverter.from((Publisher) source);
}
return null;
......
......@@ -32,6 +32,7 @@ import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.ByteBufferDecoder;
import org.springframework.core.codec.ByteBufferEncoder;
import org.springframework.core.convert.support.PublisherToFluxConverter;
import org.springframework.http.codec.json.JacksonJsonDecoder;
import org.springframework.http.codec.json.JacksonJsonEncoder;
import org.springframework.http.codec.xml.Jaxb2Decoder;
......@@ -41,8 +42,8 @@ import org.springframework.core.codec.StringEncoder;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.convert.converter.ConverterRegistry;
import org.springframework.core.convert.support.GenericConversionService;
import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter;
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter;
import org.springframework.core.convert.support.MonoToCompletableFutureConverter;
import org.springframework.core.convert.support.ReactorToRxJava1Converter;
import org.springframework.format.Formatter;
import org.springframework.http.MediaType;
import org.springframework.http.codec.SseEventEncoder;
......@@ -285,14 +286,15 @@ public class WebReactiveConfiguration implements ApplicationContextAware {
* Override to add custom {@link Converter}s and {@link Formatter}s.
* <p>By default this method method registers:
* <ul>
* <li>{@link ReactiveStreamsToCompletableFutureConverter}
* <li>{@link ReactiveStreamsToRxJava1Converter}
* <li>{@link MonoToCompletableFutureConverter}
* <li>{@link ReactorToRxJava1Converter}
* </ul>
*/
protected void addFormatters(ConverterRegistry registry) {
registry.addConverter(new ReactiveStreamsToCompletableFutureConverter());
registry.addConverter(new MonoToCompletableFutureConverter());
registry.addConverter(new PublisherToFluxConverter());
if (DependencyUtils.hasRxJava1()) {
registry.addConverter(new ReactiveStreamsToRxJava1Converter());
registry.addConverter(new ReactorToRxJava1Converter());
}
}
......
......@@ -24,13 +24,12 @@ import reactor.core.publisher.Flux;
import rx.Observable;
import org.springframework.core.ResolvableType;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.core.convert.support.GenericConversionService;
import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter;
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter;
import org.springframework.core.convert.support.MonoToCompletableFutureConverter;
import org.springframework.core.convert.support.PublisherToFluxConverter;
import org.springframework.core.convert.support.ReactorToRxJava1Converter;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.reactive.HandlerResult;
import org.springframework.web.reactive.result.SimpleResultHandler;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
......@@ -46,8 +45,9 @@ public class SimpleResultHandlerTests {
public void supportsWithConversionService() throws NoSuchMethodException {
GenericConversionService conversionService = new GenericConversionService();
conversionService.addConverter(new ReactiveStreamsToCompletableFutureConverter());
conversionService.addConverter(new ReactiveStreamsToRxJava1Converter());
conversionService.addConverter(new MonoToCompletableFutureConverter());
conversionService.addConverter(new PublisherToFluxConverter());
conversionService.addConverter(new ReactorToRxJava1Converter());
SimpleResultHandler resultHandler = new SimpleResultHandler(conversionService);
TestController controller = new TestController();
......
......@@ -40,8 +40,9 @@ import org.springframework.core.ResolvableType;
import org.springframework.core.codec.ByteBufferEncoder;
import org.springframework.core.codec.StringEncoder;
import org.springframework.core.convert.support.GenericConversionService;
import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter;
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter;
import org.springframework.core.convert.support.MonoToCompletableFutureConverter;
import org.springframework.core.convert.support.PublisherToFluxConverter;
import org.springframework.core.convert.support.ReactorToRxJava1Converter;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
......@@ -136,7 +137,6 @@ public class MessageConverterResultHandlerTests {
}
@Test // SPR-12811
@Ignore
public void jacksonTypeOfListElement() throws Exception {
List<ParentClass> body = Arrays.asList(new Foo("foo"), new Bar("bar"));
ResolvableType bodyType = ResolvableType.forClassWithGenerics(List.class, ParentClass.class);
......@@ -185,8 +185,9 @@ public class MessageConverterResultHandlerTests {
}
GenericConversionService service = new GenericConversionService();
service.addConverter(new ReactiveStreamsToCompletableFutureConverter());
service.addConverter(new ReactiveStreamsToRxJava1Converter());
service.addConverter(new MonoToCompletableFutureConverter());
service.addConverter(new PublisherToFluxConverter());
service.addConverter(new ReactorToRxJava1Converter());
RequestedContentTypeResolver resolver = new RequestedContentTypeResolverBuilder().build();
......
......@@ -30,7 +30,6 @@ import java.util.concurrent.CompletableFuture;
import javax.xml.bind.annotation.XmlRootElement;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
......@@ -43,11 +42,12 @@ import org.springframework.core.MethodParameter;
import org.springframework.core.ParameterNameDiscoverer;
import org.springframework.core.annotation.SynthesizingMethodParameter;
import org.springframework.core.codec.Decoder;
import org.springframework.core.convert.support.PublisherToFluxConverter;
import org.springframework.http.codec.json.JacksonJsonDecoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.convert.support.GenericConversionService;
import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter;
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter;
import org.springframework.core.convert.support.MonoToCompletableFutureConverter;
import org.springframework.core.convert.support.ReactorToRxJava1Converter;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpMethod;
......@@ -169,10 +169,7 @@ public class RequestBodyArgumentResolverTests {
assertEquals(map, resolveValue("map", Map.class, body));
}
// TODO: @Ignore
@Test
@Ignore
public void list() throws Exception {
String body = "[{\"bar\":\"b1\",\"foo\":\"f1\"},{\"bar\":\"b2\",\"foo\":\"f2\"}]";
assertEquals(Arrays.asList(new TestBean("f1", "b1"), new TestBean("f2", "b2")),
......@@ -180,7 +177,6 @@ public class RequestBodyArgumentResolverTests {
}
@Test
@Ignore
public void array() throws Exception {
String body = "[{\"bar\":\"b1\",\"foo\":\"f1\"},{\"bar\":\"b2\",\"foo\":\"f2\"}]";
assertArrayEquals(new TestBean[] {new TestBean("f1", "b1"), new TestBean("f2", "b2")},
......@@ -220,8 +216,9 @@ public class RequestBodyArgumentResolverTests {
List<HttpMessageConverter<?>> converters = new ArrayList<>();
Arrays.asList(decoders).forEach(decoder -> converters.add(new CodecHttpMessageConverter<>(decoder)));
GenericConversionService service = new GenericConversionService();
service.addConverter(new ReactiveStreamsToCompletableFutureConverter());
service.addConverter(new ReactiveStreamsToRxJava1Converter());
service.addConverter(new MonoToCompletableFutureConverter());
service.addConverter(new PublisherToFluxConverter());
service.addConverter(new ReactorToRxJava1Converter());
return new RequestBodyArgumentResolver(converters, service, new TestBeanValidator());
}
......
......@@ -30,8 +30,9 @@ import org.springframework.core.codec.ByteBufferEncoder;
import org.springframework.core.codec.StringEncoder;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.core.convert.support.GenericConversionService;
import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter;
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter;
import org.springframework.core.convert.support.MonoToCompletableFutureConverter;
import org.springframework.core.convert.support.PublisherToFluxConverter;
import org.springframework.core.convert.support.ReactorToRxJava1Converter;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.http.codec.json.JacksonJsonEncoder;
......@@ -100,8 +101,9 @@ public class ResponseBodyResultHandlerTests {
converterList = Arrays.asList(converters);
}
GenericConversionService service = new GenericConversionService();
service.addConverter(new ReactiveStreamsToCompletableFutureConverter());
service.addConverter(new ReactiveStreamsToRxJava1Converter());
service.addConverter(new MonoToCompletableFutureConverter());
service.addConverter(new PublisherToFluxConverter());
service.addConverter(new ReactorToRxJava1Converter());
RequestedContentTypeResolver resolver = new RequestedContentTypeResolverBuilder().build();
return new ResponseBodyResultHandler(converterList, new DefaultConversionService(), resolver);
......
......@@ -33,8 +33,9 @@ import org.springframework.core.ResolvableType;
import org.springframework.core.codec.ByteBufferEncoder;
import org.springframework.core.codec.StringEncoder;
import org.springframework.core.convert.support.GenericConversionService;
import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter;
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter;
import org.springframework.core.convert.support.MonoToCompletableFutureConverter;
import org.springframework.core.convert.support.PublisherToFluxConverter;
import org.springframework.core.convert.support.ReactorToRxJava1Converter;
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
......@@ -105,8 +106,11 @@ public class ResponseEntityResultHandlerTests {
converterList = Arrays.asList(converters);
}
GenericConversionService service = new GenericConversionService();
service.addConverter(new ReactiveStreamsToCompletableFutureConverter());
service.addConverter(new ReactiveStreamsToRxJava1Converter());
service.addConverter(new MonoToCompletableFutureConverter());
service.addConverter(new PublisherToFluxConverter());
service.addConverter(new ReactorToRxJava1Converter());
RequestedContentTypeResolver resolver = new RequestedContentTypeResolverBuilder().build();
return new ResponseEntityResultHandler(converterList, service, resolver);
......
......@@ -39,7 +39,8 @@ import org.springframework.core.Ordered;
import org.springframework.core.ResolvableType;
import org.springframework.core.convert.support.ConfigurableConversionService;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter;
import org.springframework.core.convert.support.PublisherToFluxConverter;
import org.springframework.core.convert.support.ReactorToRxJava1Converter;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
......@@ -252,7 +253,8 @@ public class ViewResolutionResultHandlerTests {
private ViewResolutionResultHandler createResultHandler(List<View> defaultViews, ViewResolver... resolvers) {
ConfigurableConversionService service = new DefaultConversionService();
service.addConverter(new ReactiveStreamsToRxJava1Converter());
service.addConverter(new ReactorToRxJava1Converter());
service.addConverter(new PublisherToFluxConverter());
List<ViewResolver> resolverList = Arrays.asList(resolvers);
ViewResolutionResultHandler handler = new ViewResolutionResultHandler(resolverList, service);
handler.setDefaultViews(defaultViews);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册