提交 5c17e2f6 编写于 作者: J Jake Wharton

Add createAsync() to RxJava 1.x CallAdapter.Factory.

This uses Call.enqueue() instead of Call.execute() when invoking the underlying Call.
上级 4ec77317
/*
* Copyright (C) 2016 Jake Wharton
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2.adapter.rxjava;
import java.util.concurrent.atomic.AtomicInteger;
import retrofit2.Call;
import retrofit2.Response;
import rx.Producer;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.CompositeException;
import rx.exceptions.Exceptions;
import rx.plugins.RxJavaPlugins;
final class CallArbiter<T> extends AtomicInteger implements Subscription, Producer {
private static final int STATE_WAITING = 0;
private static final int STATE_REQUESTED = 1;
private static final int STATE_HAS_RESPONSE = 2;
private static final int STATE_TERMINATED = 3;
private final Call<T> call;
private final Subscriber<? super Response<T>> subscriber;
private volatile Response<T> response;
CallArbiter(Call<T> call, Subscriber<? super Response<T>> subscriber) {
super(STATE_WAITING);
this.call = call;
this.subscriber = subscriber;
}
@Override public void unsubscribe() {
call.cancel();
}
@Override public boolean isUnsubscribed() {
return call.isCanceled();
}
@Override public void request(long amount) {
if (amount == 0) {
return;
}
while (true) {
int state = get();
switch (state) {
case STATE_WAITING:
if (compareAndSet(STATE_WAITING, STATE_REQUESTED)) {
return;
}
break; // State transition failed. Try again.
case STATE_HAS_RESPONSE:
if (compareAndSet(STATE_HAS_RESPONSE, STATE_TERMINATED)) {
deliverResponse(response);
return;
}
break; // State transition failed. Try again.
case STATE_REQUESTED:
case STATE_TERMINATED:
return; // Nothing to do.
default:
throw new IllegalStateException("Unknown state: " + state);
}
}
}
void emitResponse(Response<T> response) {
while (true) {
int state = get();
switch (state) {
case STATE_WAITING:
this.response = response;
if (compareAndSet(STATE_WAITING, STATE_HAS_RESPONSE)) {
return;
}
break; // State transition failed. Try again.
case STATE_REQUESTED:
if (compareAndSet(STATE_REQUESTED, STATE_TERMINATED)) {
deliverResponse(response);
return;
}
break; // State transition failed. Try again.
case STATE_HAS_RESPONSE:
case STATE_TERMINATED:
throw new AssertionError();
default:
throw new IllegalStateException("Unknown state: " + state);
}
}
}
private void deliverResponse(Response<T> response) {
try {
if (!isUnsubscribed()) {
subscriber.onNext(response);
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
try {
subscriber.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
CompositeException composite = new CompositeException(t, inner);
RxJavaPlugins.getInstance().getErrorHandler().handleError(composite);
}
return;
}
try {
subscriber.onCompleted();
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
}
}
void emitError(Throwable t) {
set(STATE_TERMINATED);
if (!isUnsubscribed()) {
try {
subscriber.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
CompositeException composite = new CompositeException(t, inner);
RxJavaPlugins.getInstance().getErrorHandler().handleError(composite);
}
}
}
}
/*
* Copyright (C) 2016 Jake Wharton
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2.adapter.rxjava;
import retrofit2.Call;
import retrofit2.Callback;
import retrofit2.Response;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.exceptions.Exceptions;
final class CallEnqueueOnSubscribe<T> implements OnSubscribe<Response<T>> {
private final Call<T> originalCall;
CallEnqueueOnSubscribe(Call<T> originalCall) {
this.originalCall = originalCall;
}
@Override public void call(Subscriber<? super Response<T>> subscriber) {
// Since Call is a one-shot type, clone it for each new subscriber.
Call<T> call = originalCall.clone();
final CallArbiter<T> arbiter = new CallArbiter<>(call, subscriber);
subscriber.add(arbiter);
subscriber.setProducer(arbiter);
call.enqueue(new Callback<T>() {
@Override public void onResponse(Call<T> call, Response<T> response) {
arbiter.emitResponse(response);
}
@Override public void onFailure(Call<T> call, Throwable t) {
Exceptions.throwIfFatal(t);
arbiter.emitError(t);
}
});
}
}
/*
* Copyright (C) 2016 Jake Wharton
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2.adapter.rxjava;
import retrofit2.Call;
import retrofit2.Response;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.exceptions.Exceptions;
final class CallExecuteOnSubscribe<T> implements OnSubscribe<Response<T>> {
private final Call<T> originalCall;
CallExecuteOnSubscribe(Call<T> originalCall) {
this.originalCall = originalCall;
}
@Override public void call(Subscriber<? super Response<T>> subscriber) {
// Since Call is a one-shot type, clone it for each new subscriber.
Call<T> call = originalCall.clone();
CallArbiter<T> arbiter = new CallArbiter<>(call, subscriber);
subscriber.add(arbiter);
subscriber.setProducer(arbiter);
Response<T> response;
try {
response = call.execute();
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
arbiter.emitError(t);
return;
}
arbiter.emitResponse(response);
}
}
/*
* Copyright (C) 2016 Jake Wharton
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2.adapter.rxjava;
import java.util.concurrent.atomic.AtomicInteger;
import retrofit2.Call;
import retrofit2.Response;
import rx.Observable.OnSubscribe;
import rx.Producer;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.CompositeException;
import rx.exceptions.Exceptions;
import rx.plugins.RxJavaPlugins;
final class CallOnSubscribe<T> implements OnSubscribe<Response<T>> {
private final Call<T> originalCall;
CallOnSubscribe(Call<T> originalCall) {
this.originalCall = originalCall;
}
@Override public void call(Subscriber<? super Response<T>> subscriber) {
// Since Call is a one-shot type, clone it for each new subscriber.
Call<T> call = originalCall.clone();
CallArbiter<T> arbiter = new CallArbiter<>(call, subscriber);
subscriber.add(arbiter);
subscriber.setProducer(arbiter);
Response<T> response;
try {
response = call.execute();
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
arbiter.emitError(t);
return;
}
arbiter.emitResponse(response);
}
static final class CallArbiter<T> extends AtomicInteger implements Subscription, Producer {
private static final int STATE_WAITING = 0;
private static final int STATE_REQUESTED = 1;
private static final int STATE_HAS_RESPONSE = 2;
private static final int STATE_TERMINATED = 3;
private final Call<T> call;
private final Subscriber<? super Response<T>> subscriber;
private volatile Response<T> response;
CallArbiter(Call<T> call, Subscriber<? super Response<T>> subscriber) {
super(STATE_WAITING);
this.call = call;
this.subscriber = subscriber;
}
@Override public void unsubscribe() {
call.cancel();
}
@Override public boolean isUnsubscribed() {
return call.isCanceled();
}
@Override public void request(long amount) {
if (amount == 0) {
return;
}
while (true) {
int state = get();
switch (state) {
case STATE_WAITING:
if (compareAndSet(STATE_WAITING, STATE_REQUESTED)) {
return;
}
break; // State transition failed. Try again.
case STATE_HAS_RESPONSE:
if (compareAndSet(STATE_HAS_RESPONSE, STATE_TERMINATED)) {
deliverResponse(response);
return;
}
break; // State transition failed. Try again.
case STATE_REQUESTED:
case STATE_TERMINATED:
return; // Nothing to do.
default:
throw new IllegalStateException("Unknown state: " + state);
}
}
}
void emitResponse(Response<T> response) {
while (true) {
int state = get();
switch (state) {
case STATE_WAITING:
this.response = response;
if (compareAndSet(STATE_WAITING, STATE_HAS_RESPONSE)) {
return;
}
break; // State transition failed. Try again.
case STATE_REQUESTED:
if (compareAndSet(STATE_REQUESTED, STATE_TERMINATED)) {
deliverResponse(response);
return;
}
break; // State transition failed. Try again.
case STATE_HAS_RESPONSE:
case STATE_TERMINATED:
throw new AssertionError();
default:
throw new IllegalStateException("Unknown state: " + state);
}
}
}
private void deliverResponse(Response<T> response) {
try {
if (!isUnsubscribed()) {
subscriber.onNext(response);
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
try {
subscriber.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
CompositeException composite = new CompositeException(t, inner);
RxJavaPlugins.getInstance().getErrorHandler().handleError(composite);
}
return;
}
try {
subscriber.onCompleted();
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
}
}
void emitError(Throwable t) {
set(STATE_TERMINATED);
if (!isUnsubscribed()) {
try {
subscriber.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
CompositeException composite = new CompositeException(t, inner);
RxJavaPlugins.getInstance().getErrorHandler().handleError(composite);
}
}
}
}
}
......@@ -26,15 +26,17 @@ import rx.Scheduler;
final class RxJavaCallAdapter<R> implements CallAdapter<R, Object> {
private final Type responseType;
private final Scheduler scheduler;
private final boolean isAsync;
private final boolean isResult;
private final boolean isBody;
private final boolean isSingle;
private final boolean isCompletable;
RxJavaCallAdapter(Type responseType, Scheduler scheduler, boolean isResult, boolean isBody,
boolean isSingle, boolean isCompletable) {
RxJavaCallAdapter(Type responseType, Scheduler scheduler, boolean isAsync, boolean isResult,
boolean isBody, boolean isSingle, boolean isCompletable) {
this.responseType = responseType;
this.scheduler = scheduler;
this.isAsync = isAsync;
this.isResult = isResult;
this.isBody = isBody;
this.isSingle = isSingle;
......@@ -46,7 +48,9 @@ final class RxJavaCallAdapter<R> implements CallAdapter<R, Object> {
}
@Override public Object adapt(Call<R> call) {
OnSubscribe<Response<R>> callFunc = new CallOnSubscribe<>(call);
OnSubscribe<Response<R>> callFunc = isAsync
? new CallEnqueueOnSubscribe<>(call)
: new CallExecuteOnSubscribe<>(call);
OnSubscribe<?> func;
if (isResult) {
......
......@@ -61,7 +61,15 @@ public final class RxJavaCallAdapterFactory extends CallAdapter.Factory {
* by default.
*/
public static RxJavaCallAdapterFactory create() {
return new RxJavaCallAdapterFactory(null);
return new RxJavaCallAdapterFactory(null, false);
}
/**
* Returns an instance which creates asynchronous observables. Applying
* {@link Observable#subscribeOn} has no effect on stream types created by this factory.
*/
public static RxJavaCallAdapterFactory createAsync() {
return new RxJavaCallAdapterFactory(null, true);
}
/**
......@@ -70,13 +78,15 @@ public final class RxJavaCallAdapterFactory extends CallAdapter.Factory {
*/
public static RxJavaCallAdapterFactory createWithScheduler(Scheduler scheduler) {
if (scheduler == null) throw new NullPointerException("scheduler == null");
return new RxJavaCallAdapterFactory(scheduler);
return new RxJavaCallAdapterFactory(scheduler, false);
}
private final Scheduler scheduler;
private final boolean isAsync;
private RxJavaCallAdapterFactory(Scheduler scheduler) {
private RxJavaCallAdapterFactory(Scheduler scheduler, boolean isAsync) {
this.scheduler = scheduler;
this.isAsync = isAsync;
}
@Override
......@@ -89,7 +99,7 @@ public final class RxJavaCallAdapterFactory extends CallAdapter.Factory {
}
if (isCompletable) {
return new RxJavaCallAdapter(Void.class, scheduler, false, true, false, true);
return new RxJavaCallAdapter(Void.class, scheduler, isAsync, false, true, false, true);
}
boolean isResult = false;
......@@ -121,6 +131,7 @@ public final class RxJavaCallAdapterFactory extends CallAdapter.Factory {
isBody = true;
}
return new RxJavaCallAdapter(responseType, scheduler, isResult, isBody, isSingle, false);
return new RxJavaCallAdapter(responseType, scheduler, isAsync, isResult, isBody, isSingle,
false);
}
}
/*
* Copyright (C) 2017 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2.adapter.rxjava;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import retrofit2.Retrofit;
import retrofit2.http.GET;
import rx.Completable;
import rx.exceptions.CompositeException;
import rx.exceptions.Exceptions;
import rx.observers.AsyncCompletableSubscriber;
import rx.observers.TestSubscriber;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaPlugins;
import static java.util.concurrent.TimeUnit.SECONDS;
import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AFTER_REQUEST;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
public final class AsyncTest {
@Rule public final MockWebServer server = new MockWebServer();
@Rule public final TestRule pluginsReset = new RxJavaPluginsResetRule();
interface Service {
@GET("/") Completable completable();
}
private Service service;
@Before public void setUp() {
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(server.url("/"))
.addCallAdapterFactory(RxJavaCallAdapterFactory.createAsync())
.build();
service = retrofit.create(Service.class);
}
@Test public void success() throws InterruptedException {
TestSubscriber<Void> subscriber = new TestSubscriber<>();
service.completable().subscribe(subscriber);
assertFalse(subscriber.awaitValueCount(1, 1, SECONDS));
server.enqueue(new MockResponse());
subscriber.awaitTerminalEvent(1, SECONDS);
subscriber.assertCompleted();
}
@Test public void failure() throws InterruptedException {
TestSubscriber<Void> subscriber = new TestSubscriber<>();
service.completable().subscribe(subscriber);
assertFalse(subscriber.awaitValueCount(1, 1, SECONDS));
server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST));
subscriber.awaitTerminalEvent(1, SECONDS);
subscriber.assertError(IOException.class);
}
@Test public void throwingInOnCompleteDeliveredToPlugin() throws InterruptedException {
server.enqueue(new MockResponse());
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
@Override public void handleError(Throwable throwable) {
if (!errorRef.compareAndSet(null, throwable)) {
throw Exceptions.propagate(throwable); // Don't swallow secondary errors!
}
latch.countDown();
}
});
final TestSubscriber<Void> subscriber = new TestSubscriber<>();
final RuntimeException e = new RuntimeException();
service.completable().unsafeSubscribe(new AsyncCompletableSubscriber() {
@Override public void onCompleted() {
throw e;
}
@Override public void onError(Throwable t) {
subscriber.onError(t);
}
});
latch.await(1, SECONDS);
assertThat(errorRef.get()).isSameAs(e);
}
@Test public void bodyThrowingInOnErrorDeliveredToPlugin() throws InterruptedException {
server.enqueue(new MockResponse().setResponseCode(404));
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> pluginRef = new AtomicReference<>();
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
@Override public void handleError(Throwable throwable) {
if (!pluginRef.compareAndSet(null, throwable)) {
throw Exceptions.propagate(throwable); // Don't swallow secondary errors!
}
latch.countDown();
}
});
final TestSubscriber<Void> subscriber = new TestSubscriber<>();
final RuntimeException e = new RuntimeException();
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
service.completable().unsafeSubscribe(new AsyncCompletableSubscriber() {
@Override public void onCompleted() {
subscriber.onCompleted();
}
@Override public void onError(Throwable t) {
errorRef.set(t);
throw e;
}
});
latch.await(1, SECONDS);
//noinspection ThrowableResultOfMethodCallIgnored
CompositeException composite = (CompositeException) pluginRef.get();
assertThat(composite.getExceptions()).containsExactly(errorRef.get(), e);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册