diff --git a/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/CallArbiter.java b/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/CallArbiter.java new file mode 100644 index 0000000000000000000000000000000000000000..34b0b004a51ddcf7dc8bda7c77e2ab1a0ffa3605 --- /dev/null +++ b/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/CallArbiter.java @@ -0,0 +1,149 @@ +/* + * Copyright (C) 2016 Jake Wharton + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava; + +import java.util.concurrent.atomic.AtomicInteger; +import retrofit2.Call; +import retrofit2.Response; +import rx.Producer; +import rx.Subscriber; +import rx.Subscription; +import rx.exceptions.CompositeException; +import rx.exceptions.Exceptions; +import rx.plugins.RxJavaPlugins; + +final class CallArbiter extends AtomicInteger implements Subscription, Producer { + private static final int STATE_WAITING = 0; + private static final int STATE_REQUESTED = 1; + private static final int STATE_HAS_RESPONSE = 2; + private static final int STATE_TERMINATED = 3; + + private final Call call; + private final Subscriber> subscriber; + + private volatile Response response; + + CallArbiter(Call call, Subscriber> subscriber) { + super(STATE_WAITING); + + this.call = call; + this.subscriber = subscriber; + } + + @Override public void unsubscribe() { + call.cancel(); + } + + @Override public boolean isUnsubscribed() { + return call.isCanceled(); + } + + @Override public void request(long amount) { + if (amount == 0) { + return; + } + while (true) { + int state = get(); + switch (state) { + case STATE_WAITING: + if (compareAndSet(STATE_WAITING, STATE_REQUESTED)) { + return; + } + break; // State transition failed. Try again. + + case STATE_HAS_RESPONSE: + if (compareAndSet(STATE_HAS_RESPONSE, STATE_TERMINATED)) { + deliverResponse(response); + return; + } + break; // State transition failed. Try again. + + case STATE_REQUESTED: + case STATE_TERMINATED: + return; // Nothing to do. + + default: + throw new IllegalStateException("Unknown state: " + state); + } + } + } + + void emitResponse(Response response) { + while (true) { + int state = get(); + switch (state) { + case STATE_WAITING: + this.response = response; + if (compareAndSet(STATE_WAITING, STATE_HAS_RESPONSE)) { + return; + } + break; // State transition failed. Try again. + + case STATE_REQUESTED: + if (compareAndSet(STATE_REQUESTED, STATE_TERMINATED)) { + deliverResponse(response); + return; + } + break; // State transition failed. Try again. + + case STATE_HAS_RESPONSE: + case STATE_TERMINATED: + throw new AssertionError(); + + default: + throw new IllegalStateException("Unknown state: " + state); + } + } + } + + private void deliverResponse(Response response) { + try { + if (!isUnsubscribed()) { + subscriber.onNext(response); + } + } catch (Throwable t) { + Exceptions.throwIfFatal(t); + try { + subscriber.onError(t); + } catch (Throwable inner) { + Exceptions.throwIfFatal(inner); + CompositeException composite = new CompositeException(t, inner); + RxJavaPlugins.getInstance().getErrorHandler().handleError(composite); + } + return; + } + try { + subscriber.onCompleted(); + } catch (Throwable t) { + Exceptions.throwIfFatal(t); + RxJavaPlugins.getInstance().getErrorHandler().handleError(t); + } + } + + void emitError(Throwable t) { + set(STATE_TERMINATED); + + if (!isUnsubscribed()) { + try { + subscriber.onError(t); + } catch (Throwable inner) { + Exceptions.throwIfFatal(inner); + CompositeException composite = new CompositeException(t, inner); + RxJavaPlugins.getInstance().getErrorHandler().handleError(composite); + } + } + } +} diff --git a/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/CallEnqueueOnSubscribe.java b/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/CallEnqueueOnSubscribe.java new file mode 100644 index 0000000000000000000000000000000000000000..7dcf917c3981f2d24fca64583faeb29f7b613aa0 --- /dev/null +++ b/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/CallEnqueueOnSubscribe.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2016 Jake Wharton + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava; + +import retrofit2.Call; +import retrofit2.Callback; +import retrofit2.Response; +import rx.Observable.OnSubscribe; +import rx.Subscriber; +import rx.exceptions.Exceptions; + +final class CallEnqueueOnSubscribe implements OnSubscribe> { + private final Call originalCall; + + CallEnqueueOnSubscribe(Call originalCall) { + this.originalCall = originalCall; + } + + @Override public void call(Subscriber> subscriber) { + // Since Call is a one-shot type, clone it for each new subscriber. + Call call = originalCall.clone(); + final CallArbiter arbiter = new CallArbiter<>(call, subscriber); + subscriber.add(arbiter); + subscriber.setProducer(arbiter); + + call.enqueue(new Callback() { + @Override public void onResponse(Call call, Response response) { + arbiter.emitResponse(response); + } + + @Override public void onFailure(Call call, Throwable t) { + Exceptions.throwIfFatal(t); + arbiter.emitError(t); + } + }); + } +} diff --git a/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/CallExecuteOnSubscribe.java b/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/CallExecuteOnSubscribe.java new file mode 100644 index 0000000000000000000000000000000000000000..593770aa77a5916c56e5a6ea364d142bbc54c6cb --- /dev/null +++ b/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/CallExecuteOnSubscribe.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2016 Jake Wharton + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava; + +import retrofit2.Call; +import retrofit2.Response; +import rx.Observable.OnSubscribe; +import rx.Subscriber; +import rx.exceptions.Exceptions; + +final class CallExecuteOnSubscribe implements OnSubscribe> { + private final Call originalCall; + + CallExecuteOnSubscribe(Call originalCall) { + this.originalCall = originalCall; + } + + @Override public void call(Subscriber> subscriber) { + // Since Call is a one-shot type, clone it for each new subscriber. + Call call = originalCall.clone(); + CallArbiter arbiter = new CallArbiter<>(call, subscriber); + subscriber.add(arbiter); + subscriber.setProducer(arbiter); + + Response response; + try { + response = call.execute(); + } catch (Throwable t) { + Exceptions.throwIfFatal(t); + arbiter.emitError(t); + return; + } + arbiter.emitResponse(response); + } +} diff --git a/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/CallOnSubscribe.java b/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/CallOnSubscribe.java deleted file mode 100644 index 7e0f53969c7f5a7ceffa0836a6ba0707010f7d2c..0000000000000000000000000000000000000000 --- a/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/CallOnSubscribe.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright (C) 2016 Jake Wharton - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package retrofit2.adapter.rxjava; - -import java.util.concurrent.atomic.AtomicInteger; -import retrofit2.Call; -import retrofit2.Response; -import rx.Observable.OnSubscribe; -import rx.Producer; -import rx.Subscriber; -import rx.Subscription; -import rx.exceptions.CompositeException; -import rx.exceptions.Exceptions; -import rx.plugins.RxJavaPlugins; - -final class CallOnSubscribe implements OnSubscribe> { - private final Call originalCall; - - CallOnSubscribe(Call originalCall) { - this.originalCall = originalCall; - } - - @Override public void call(Subscriber> subscriber) { - // Since Call is a one-shot type, clone it for each new subscriber. - Call call = originalCall.clone(); - CallArbiter arbiter = new CallArbiter<>(call, subscriber); - subscriber.add(arbiter); - subscriber.setProducer(arbiter); - - Response response; - try { - response = call.execute(); - } catch (Throwable t) { - Exceptions.throwIfFatal(t); - arbiter.emitError(t); - return; - } - arbiter.emitResponse(response); - } - - static final class CallArbiter extends AtomicInteger implements Subscription, Producer { - private static final int STATE_WAITING = 0; - private static final int STATE_REQUESTED = 1; - private static final int STATE_HAS_RESPONSE = 2; - private static final int STATE_TERMINATED = 3; - - private final Call call; - private final Subscriber> subscriber; - - private volatile Response response; - - CallArbiter(Call call, Subscriber> subscriber) { - super(STATE_WAITING); - - this.call = call; - this.subscriber = subscriber; - } - - @Override public void unsubscribe() { - call.cancel(); - } - - @Override public boolean isUnsubscribed() { - return call.isCanceled(); - } - - @Override public void request(long amount) { - if (amount == 0) { - return; - } - while (true) { - int state = get(); - switch (state) { - case STATE_WAITING: - if (compareAndSet(STATE_WAITING, STATE_REQUESTED)) { - return; - } - break; // State transition failed. Try again. - - case STATE_HAS_RESPONSE: - if (compareAndSet(STATE_HAS_RESPONSE, STATE_TERMINATED)) { - deliverResponse(response); - return; - } - break; // State transition failed. Try again. - - case STATE_REQUESTED: - case STATE_TERMINATED: - return; // Nothing to do. - - default: - throw new IllegalStateException("Unknown state: " + state); - } - } - } - - void emitResponse(Response response) { - while (true) { - int state = get(); - switch (state) { - case STATE_WAITING: - this.response = response; - if (compareAndSet(STATE_WAITING, STATE_HAS_RESPONSE)) { - return; - } - break; // State transition failed. Try again. - - case STATE_REQUESTED: - if (compareAndSet(STATE_REQUESTED, STATE_TERMINATED)) { - deliverResponse(response); - return; - } - break; // State transition failed. Try again. - - case STATE_HAS_RESPONSE: - case STATE_TERMINATED: - throw new AssertionError(); - - default: - throw new IllegalStateException("Unknown state: " + state); - } - } - } - - private void deliverResponse(Response response) { - try { - if (!isUnsubscribed()) { - subscriber.onNext(response); - } - } catch (Throwable t) { - Exceptions.throwIfFatal(t); - try { - subscriber.onError(t); - } catch (Throwable inner) { - Exceptions.throwIfFatal(inner); - CompositeException composite = new CompositeException(t, inner); - RxJavaPlugins.getInstance().getErrorHandler().handleError(composite); - } - return; - } - try { - subscriber.onCompleted(); - } catch (Throwable t) { - Exceptions.throwIfFatal(t); - RxJavaPlugins.getInstance().getErrorHandler().handleError(t); - } - } - - void emitError(Throwable t) { - set(STATE_TERMINATED); - - if (!isUnsubscribed()) { - try { - subscriber.onError(t); - } catch (Throwable inner) { - Exceptions.throwIfFatal(inner); - CompositeException composite = new CompositeException(t, inner); - RxJavaPlugins.getInstance().getErrorHandler().handleError(composite); - } - } - } - } -} diff --git a/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/RxJavaCallAdapter.java b/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/RxJavaCallAdapter.java index 8d8f3776a25a850e8ba5980dd2c67c50ec915c43..f14c47181adcc13eb0836a123a846456f0428a83 100644 --- a/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/RxJavaCallAdapter.java +++ b/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/RxJavaCallAdapter.java @@ -26,15 +26,17 @@ import rx.Scheduler; final class RxJavaCallAdapter implements CallAdapter { private final Type responseType; private final Scheduler scheduler; + private final boolean isAsync; private final boolean isResult; private final boolean isBody; private final boolean isSingle; private final boolean isCompletable; - RxJavaCallAdapter(Type responseType, Scheduler scheduler, boolean isResult, boolean isBody, - boolean isSingle, boolean isCompletable) { + RxJavaCallAdapter(Type responseType, Scheduler scheduler, boolean isAsync, boolean isResult, + boolean isBody, boolean isSingle, boolean isCompletable) { this.responseType = responseType; this.scheduler = scheduler; + this.isAsync = isAsync; this.isResult = isResult; this.isBody = isBody; this.isSingle = isSingle; @@ -46,7 +48,9 @@ final class RxJavaCallAdapter implements CallAdapter { } @Override public Object adapt(Call call) { - OnSubscribe> callFunc = new CallOnSubscribe<>(call); + OnSubscribe> callFunc = isAsync + ? new CallEnqueueOnSubscribe<>(call) + : new CallExecuteOnSubscribe<>(call); OnSubscribe func; if (isResult) { diff --git a/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/RxJavaCallAdapterFactory.java b/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/RxJavaCallAdapterFactory.java index 28345167917a0003dfbb80469f3aca52c73b26f8..184750d0593f639a560a194104142f4aebc881b3 100644 --- a/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/RxJavaCallAdapterFactory.java +++ b/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/RxJavaCallAdapterFactory.java @@ -61,7 +61,15 @@ public final class RxJavaCallAdapterFactory extends CallAdapter.Factory { * by default. */ public static RxJavaCallAdapterFactory create() { - return new RxJavaCallAdapterFactory(null); + return new RxJavaCallAdapterFactory(null, false); + } + + /** + * Returns an instance which creates asynchronous observables. Applying + * {@link Observable#subscribeOn} has no effect on stream types created by this factory. + */ + public static RxJavaCallAdapterFactory createAsync() { + return new RxJavaCallAdapterFactory(null, true); } /** @@ -70,13 +78,15 @@ public final class RxJavaCallAdapterFactory extends CallAdapter.Factory { */ public static RxJavaCallAdapterFactory createWithScheduler(Scheduler scheduler) { if (scheduler == null) throw new NullPointerException("scheduler == null"); - return new RxJavaCallAdapterFactory(scheduler); + return new RxJavaCallAdapterFactory(scheduler, false); } private final Scheduler scheduler; + private final boolean isAsync; - private RxJavaCallAdapterFactory(Scheduler scheduler) { + private RxJavaCallAdapterFactory(Scheduler scheduler, boolean isAsync) { this.scheduler = scheduler; + this.isAsync = isAsync; } @Override @@ -89,7 +99,7 @@ public final class RxJavaCallAdapterFactory extends CallAdapter.Factory { } if (isCompletable) { - return new RxJavaCallAdapter(Void.class, scheduler, false, true, false, true); + return new RxJavaCallAdapter(Void.class, scheduler, isAsync, false, true, false, true); } boolean isResult = false; @@ -121,6 +131,7 @@ public final class RxJavaCallAdapterFactory extends CallAdapter.Factory { isBody = true; } - return new RxJavaCallAdapter(responseType, scheduler, isResult, isBody, isSingle, false); + return new RxJavaCallAdapter(responseType, scheduler, isAsync, isResult, isBody, isSingle, + false); } } diff --git a/retrofit-adapters/rxjava/src/test/java/retrofit2/adapter/rxjava/AsyncTest.java b/retrofit-adapters/rxjava/src/test/java/retrofit2/adapter/rxjava/AsyncTest.java new file mode 100644 index 0000000000000000000000000000000000000000..2c27037693b791a858a73911b3238453f9e118cb --- /dev/null +++ b/retrofit-adapters/rxjava/src/test/java/retrofit2/adapter/rxjava/AsyncTest.java @@ -0,0 +1,143 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import retrofit2.Retrofit; +import retrofit2.http.GET; +import rx.Completable; +import rx.exceptions.CompositeException; +import rx.exceptions.Exceptions; +import rx.observers.AsyncCompletableSubscriber; +import rx.observers.TestSubscriber; +import rx.plugins.RxJavaErrorHandler; +import rx.plugins.RxJavaPlugins; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AFTER_REQUEST; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertFalse; + +public final class AsyncTest { + @Rule public final MockWebServer server = new MockWebServer(); + @Rule public final TestRule pluginsReset = new RxJavaPluginsResetRule(); + + interface Service { + @GET("/") Completable completable(); + } + + private Service service; + @Before public void setUp() { + Retrofit retrofit = new Retrofit.Builder() + .baseUrl(server.url("/")) + .addCallAdapterFactory(RxJavaCallAdapterFactory.createAsync()) + .build(); + service = retrofit.create(Service.class); + } + + @Test public void success() throws InterruptedException { + TestSubscriber subscriber = new TestSubscriber<>(); + service.completable().subscribe(subscriber); + assertFalse(subscriber.awaitValueCount(1, 1, SECONDS)); + + server.enqueue(new MockResponse()); + subscriber.awaitTerminalEvent(1, SECONDS); + subscriber.assertCompleted(); + } + + + @Test public void failure() throws InterruptedException { + TestSubscriber subscriber = new TestSubscriber<>(); + service.completable().subscribe(subscriber); + assertFalse(subscriber.awaitValueCount(1, 1, SECONDS)); + + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + subscriber.awaitTerminalEvent(1, SECONDS); + subscriber.assertError(IOException.class); + } + + @Test public void throwingInOnCompleteDeliveredToPlugin() throws InterruptedException { + server.enqueue(new MockResponse()); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference errorRef = new AtomicReference<>(); + RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() { + @Override public void handleError(Throwable throwable) { + if (!errorRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); // Don't swallow secondary errors! + } + latch.countDown(); + } + }); + + final TestSubscriber subscriber = new TestSubscriber<>(); + final RuntimeException e = new RuntimeException(); + service.completable().unsafeSubscribe(new AsyncCompletableSubscriber() { + @Override public void onCompleted() { + throw e; + } + + @Override public void onError(Throwable t) { + subscriber.onError(t); + } + }); + + latch.await(1, SECONDS); + assertThat(errorRef.get()).isSameAs(e); + } + + @Test public void bodyThrowingInOnErrorDeliveredToPlugin() throws InterruptedException { + server.enqueue(new MockResponse().setResponseCode(404)); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference pluginRef = new AtomicReference<>(); + RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() { + @Override public void handleError(Throwable throwable) { + if (!pluginRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); // Don't swallow secondary errors! + } + latch.countDown(); + } + }); + + final TestSubscriber subscriber = new TestSubscriber<>(); + final RuntimeException e = new RuntimeException(); + final AtomicReference errorRef = new AtomicReference<>(); + service.completable().unsafeSubscribe(new AsyncCompletableSubscriber() { + @Override public void onCompleted() { + subscriber.onCompleted(); + } + + @Override public void onError(Throwable t) { + errorRef.set(t); + throw e; + } + }); + + latch.await(1, SECONDS); + //noinspection ThrowableResultOfMethodCallIgnored + CompositeException composite = (CompositeException) pluginRef.get(); + assertThat(composite.getExceptions()).containsExactly(errorRef.get(), e); + } +}