From 729551f375a2fb9675c337873ae6cf4341735139 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 29 Nov 2016 13:27:27 -0500 Subject: [PATCH] Use full package names in ReactiveAdapterRegistry The recent refactoring lead to java.lang.NoClassDefFoundError: io/reactivex/Completable where only RxJava 1 is in the classpath. Most likely due to the lack of prefix in Completable::complete with rx package to avoid the RxJava 2 reference. --- .../core/ReactiveAdapterRegistry.java | 51 ++++++++++--------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java index 37145b006e..7fe7a05bb0 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java @@ -23,10 +23,6 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Function; import io.reactivex.BackpressureStrategy; -import io.reactivex.Completable; -import io.reactivex.Flowable; -import io.reactivex.Maybe; -import io.reactivex.Observable; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -34,6 +30,11 @@ import rx.RxReactiveStreams; import org.springframework.util.ClassUtils; +import static org.springframework.core.ReactiveTypeDescriptor.multiValue; +import static org.springframework.core.ReactiveTypeDescriptor.noValue; +import static org.springframework.core.ReactiveTypeDescriptor.singleOptionalValue; +import static org.springframework.core.ReactiveTypeDescriptor.singleRequiredValue; + /** * A registry of adapters to adapt a Reactive Streams {@link Publisher} to/from * various async/reactive types such as {@code CompletableFuture}, RxJava @@ -135,21 +136,21 @@ public class ReactiveAdapterRegistry { // Flux and Mono ahead of Publisher... registry.registerReactiveType( - ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty), + singleOptionalValue(Mono.class, Mono::empty), source -> (Mono) source, Mono::from ); - registry.registerReactiveType(ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty), + registry.registerReactiveType(multiValue(Flux.class, Flux::empty), source -> (Flux) source, Flux::from); - registry.registerReactiveType(ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty), + registry.registerReactiveType(multiValue(Publisher.class, Flux::empty), source -> (Publisher) source, source -> source); registry.registerReactiveType( - ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> { + singleOptionalValue(CompletableFuture.class, () -> { CompletableFuture empty = new CompletableFuture<>(); empty.complete(null); return empty; @@ -164,17 +165,17 @@ public class ReactiveAdapterRegistry { void registerAdapters(ReactiveAdapterRegistry registry) { registry.registerReactiveType( - ReactiveTypeDescriptor.multiValue(rx.Observable.class, rx.Observable::empty), + multiValue(rx.Observable.class, rx.Observable::empty), source -> RxReactiveStreams.toPublisher((rx.Observable) source), RxReactiveStreams::toObservable ); registry.registerReactiveType( - ReactiveTypeDescriptor.singleRequiredValue(rx.Single.class), + singleRequiredValue(rx.Single.class), source -> RxReactiveStreams.toPublisher((rx.Single) source), RxReactiveStreams::toSingle ); registry.registerReactiveType( - ReactiveTypeDescriptor.noValue(rx.Completable.class, Completable::complete), + noValue(rx.Completable.class, rx.Completable::complete), source -> RxReactiveStreams.toPublisher((rx.Completable) source), RxReactiveStreams::toCompletable ); @@ -185,29 +186,29 @@ public class ReactiveAdapterRegistry { void registerAdapters(ReactiveAdapterRegistry registry) { registry.registerReactiveType( - ReactiveTypeDescriptor.multiValue(Flowable.class, Flowable::empty), - source -> (Flowable) source, - source-> Flowable.fromPublisher(source) + multiValue(io.reactivex.Flowable.class, io.reactivex.Flowable::empty), + source -> (io.reactivex.Flowable) source, + source-> io.reactivex.Flowable.fromPublisher(source) ); registry.registerReactiveType( - ReactiveTypeDescriptor.multiValue(Observable.class, Observable::empty), - source -> ((Observable) source).toFlowable(BackpressureStrategy.BUFFER), - source -> Flowable.fromPublisher(source).toObservable() + multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty), + source -> ((io.reactivex.Observable) source).toFlowable(BackpressureStrategy.BUFFER), + source -> io.reactivex.Flowable.fromPublisher(source).toObservable() ); registry.registerReactiveType( - ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class), + singleRequiredValue(io.reactivex.Single.class), source -> ((io.reactivex.Single) source).toFlowable(), - source -> Flowable.fromPublisher(source).toObservable().singleElement().toSingle() + source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement().toSingle() ); registry.registerReactiveType( - ReactiveTypeDescriptor.singleOptionalValue(Maybe.class, Maybe::empty), - source -> ((Maybe) source).toFlowable(), - source -> Flowable.fromPublisher(source).toObservable().singleElement() + singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty), + source -> ((io.reactivex.Maybe) source).toFlowable(), + source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement() ); registry.registerReactiveType( - ReactiveTypeDescriptor.noValue(Completable.class, Completable::complete), - source -> ((Completable) source).toFlowable(), - source -> Flowable.fromPublisher(source).toObservable().ignoreElements() + noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete), + source -> ((io.reactivex.Completable) source).toFlowable(), + source -> io.reactivex.Flowable.fromPublisher(source).toObservable().ignoreElements() ); } } -- GitLab