diff --git a/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/RxJava2CallAdapter.java b/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/RxJava2CallAdapter.java index eccd30da3611b534196f7ce346f6909f26beca15..254f921ad517001a8b1f07e32ed7dd1dc8a68d3a 100644 --- a/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/RxJava2CallAdapter.java +++ b/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/RxJava2CallAdapter.java @@ -18,6 +18,7 @@ package retrofit2.adapter.rxjava2; import io.reactivex.BackpressureStrategy; import io.reactivex.Observable; import io.reactivex.Scheduler; +import io.reactivex.plugins.RxJavaPlugins; import java.lang.reflect.Type; import javax.annotation.Nullable; import retrofit2.Call; @@ -83,6 +84,6 @@ final class RxJava2CallAdapter implements CallAdapter { if (isCompletable) { return observable.ignoreElements(); } - return observable; + return RxJavaPlugins.onAssembly(observable); } } diff --git a/retrofit-adapters/rxjava2/src/test/java/retrofit2/adapter/rxjava2/ObservableTest.java b/retrofit-adapters/rxjava2/src/test/java/retrofit2/adapter/rxjava2/ObservableTest.java index e421c165269a8471f54eb7ed955738f893744a86..23c20f77e37e3df924ce87b42be7b740ec21719b 100644 --- a/retrofit-adapters/rxjava2/src/test/java/retrofit2/adapter/rxjava2/ObservableTest.java +++ b/retrofit-adapters/rxjava2/src/test/java/retrofit2/adapter/rxjava2/ObservableTest.java @@ -16,6 +16,8 @@ package retrofit2.adapter.rxjava2; import io.reactivex.Observable; +import io.reactivex.functions.Function; +import io.reactivex.plugins.RxJavaPlugins; import java.io.IOException; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; @@ -133,4 +135,18 @@ public final class ObservableTest { assertThat(result.error()).isInstanceOf(IOException.class); observer.assertComplete(); } + + @Test public void observableAssembly() { + try { + final Observable justMe = Observable.just("me"); + RxJavaPlugins.setOnObservableAssembly(new Function() { + @Override public Observable apply(Observable f) { + return justMe; + } + }); + assertThat(service.body()).isEqualTo(justMe); + } finally { + RxJavaPlugins.reset(); + } + } }