提交 04f62bea 编写于 作者: J Jake Wharton

Merge branch 'chrisjenx-fix/rxjava-0-18'

......@@ -51,7 +51,7 @@
<android.platform>16</android.platform>
<gson.version>2.2.4</gson.version>
<okhttp.version>1.3.0</okhttp.version>
<rxjava.version>0.17.1</rxjava.version>
<rxjava.version>0.18.3</rxjava.version>
<appengine.version>1.8.9</appengine.version>
<!-- Converter Dependencies -->
......
......@@ -8,13 +8,12 @@ import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import retrofit.client.Request;
import retrofit.client.Response;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import static retrofit.RestAdapter.LogLevel;
import static retrofit.RetrofitError.unexpectedError;
......@@ -526,30 +525,36 @@ public final class MockRestAdapter {
/** Indirection to avoid VerifyError if RxJava isn't present. */
private static class MockRxSupport {
private final Scheduler scheduler;
private final Executor httpExecutor;
private final ErrorHandler errorHandler;
MockRxSupport(RestAdapter restAdapter) {
scheduler = Schedulers.executor(restAdapter.httpExecutor);
httpExecutor = restAdapter.httpExecutor;
errorHandler = restAdapter.errorHandler;
}
Observable createMockObservable(final MockHandler mockHandler, final RestMethodInfo methodInfo,
final RequestInterceptor interceptor, final Object[] args) {
return Observable.create(new Observable.OnSubscribe<Object>() {
@Override public void call(Subscriber<? super Object> subscriber) {
try {
Observable observable =
(Observable) mockHandler.invokeSync(methodInfo, interceptor, args);
//noinspection unchecked
observable.subscribe(subscriber);
} catch (RetrofitError e) {
subscriber.onError(errorHandler.handleError(e));
} catch (Throwable e) {
subscriber.onError(e);
}
@Override public void call(final Subscriber<? super Object> subscriber) {
if (subscriber.isUnsubscribed()) return;
httpExecutor.execute(new Runnable() {
@Override public void run() {
try {
if (subscriber.isUnsubscribed()) return;
Observable observable =
(Observable) mockHandler.invokeSync(methodInfo, interceptor, args);
//noinspection unchecked
observable.subscribe(subscriber);
} catch (RetrofitError e) {
subscriber.onError(errorHandler.handleError(e));
} catch (Throwable e) {
subscriber.onError(e);
}
}
});
}
}).subscribeOn(scheduler);
});
}
}
}
......@@ -38,10 +38,6 @@ import retrofit.mime.MimeUtil;
import retrofit.mime.TypedByteArray;
import retrofit.mime.TypedInput;
import retrofit.mime.TypedOutput;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.schedulers.Schedulers;
/**
* Adapts a Java interface to a REST API.
......@@ -222,40 +218,6 @@ public class RestAdapter {
}
}
/** Indirection to avoid VerifyError if RxJava isn't present. */
private static final class RxSupport {
private final Scheduler scheduler;
private final ErrorHandler errorHandler;
RxSupport(Executor executor, ErrorHandler errorHandler) {
this.scheduler = Schedulers.executor(executor);
this.errorHandler = errorHandler;
}
Observable createRequestObservable(final Callable<ResponseWrapper> request) {
return Observable.create(new Observable.OnSubscribe<Object>() {
@Override public void call(Subscriber<? super Object> subscriber) {
if (subscriber.isUnsubscribed()) {
return;
}
try {
ResponseWrapper wrapper = request.call();
if (subscriber.isUnsubscribed()) {
return;
}
subscriber.onNext(wrapper.responseBody);
subscriber.onCompleted();
} catch (RetrofitError e) {
subscriber.onError(errorHandler.handleError(e));
} catch (Exception e) {
// This is from the Callable. It shouldn't actually throw.
throw new RuntimeException(e);
}
}
}).subscribeOn(scheduler);
}
}
private class RestHandler implements InvocationHandler {
private final Map<Method, RestMethodInfo> methodDetailsCache;
......
package retrofit;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import rx.Observable;
import rx.Subscriber;
import rx.subscriptions.Subscriptions;
/**
* Utilities for supporting RxJava Observables.
* <p>
* RxJava might not be on the available to use. Check {@link Platform#HAS_RX_JAVA} before calling.
*/
final class RxSupport {
private final Executor executor;
private final ErrorHandler errorHandler;
RxSupport(Executor executor, ErrorHandler errorHandler) {
this.executor = executor;
this.errorHandler = errorHandler;
}
Observable createRequestObservable(final Callable<ResponseWrapper> request) {
return Observable.create(new Observable.OnSubscribe<Object>() {
@Override public void call(Subscriber<? super Object> subscriber) {
if (subscriber.isUnsubscribed()) {
return;
}
FutureTask<Void> task = new FutureTask<Void>(getRunnable(subscriber, request), null);
// Subscribe to the future task of the network call allowing unsubscription.
subscriber.add(Subscriptions.from(task));
executor.execute(task);
}
});
}
private Runnable getRunnable(final Subscriber<? super Object> subscriber,
final Callable<ResponseWrapper> request) {
return new Runnable() {
@Override public void run() {
try {
if (subscriber.isUnsubscribed()) {
return;
}
ResponseWrapper wrapper = request.call();
subscriber.onNext(wrapper.responseBody);
subscriber.onCompleted();
} catch (RetrofitError e) {
subscriber.onError(errorHandler.handleError(e));
} catch (Exception e) {
// This is from the Callable. It shouldn't actually throw.
throw new RuntimeException(e);
}
}
};
}
}
package retrofit;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import retrofit.client.Header;
import retrofit.client.Response;
import retrofit.mime.TypedInput;
import rx.Observer;
import rx.Subscription;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class RxSupportTest {
private Object response;
private ResponseWrapper responseWrapper;
private Callable<ResponseWrapper> callable = spy(new Callable<ResponseWrapper>() {
@Override public ResponseWrapper call() throws Exception {
return responseWrapper;
}
});
private QueuedSynchronousExecutor executor;
private RxSupport rxSupport;
@Mock Observer<Object> subscriber;
@Before public void setUp() {
MockitoAnnotations.initMocks(this);
response = new Object();
responseWrapper = new ResponseWrapper(
new Response(
"http://example.com", 200, "Success",
Collections.<Header>emptyList(), mock(TypedInput.class)
), response
);
executor = spy(new QueuedSynchronousExecutor());
rxSupport = new RxSupport(executor, ErrorHandler.DEFAULT);
}
@Test public void testObservableCallsOnNextOnHttpExecutor() {
rxSupport.createRequestObservable(callable).subscribe(subscriber);
executor.executeNextInQueue();
verify(subscriber, times(1)).onNext(response);
}
@Test public void testObservableCallsOnNextOnHttpExecutorWithSubscriber() {
TestScheduler test = Schedulers.test();
rxSupport.createRequestObservable(callable).subscribeOn(test).subscribe(subscriber);
// Subscription is handled via the Scheduler.
test.triggerActions();
// This will only execute up to the executor in OnSubscribe.
verify(subscriber, never()).onNext(any());
// Upon continuing the executor we then run the retrofit request.
executor.executeNextInQueue();
verify(subscriber, times(1)).onNext(response);
}
@Test public void testObservableUnSubscribesDoesNotExecuteCallable() throws Exception {
Subscription subscription = rxSupport.createRequestObservable(callable).subscribe(subscriber);
verify(subscriber, never()).onNext(any());
// UnSubscribe here should cancel the queued runnable.
subscription.unsubscribe();
executor.executeNextInQueue();
verify(callable, never()).call();
verify(subscriber, never()).onNext(response);
}
@Test public void testObservableCallsOperatorsOffHttpExecutor() {
TestScheduler test = Schedulers.test();
rxSupport.createRequestObservable(callable)
.delaySubscription(1000, TimeUnit.MILLISECONDS, test)
.subscribe(subscriber);
verify(subscriber, never()).onNext(any());
test.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
// Upon continuing the executor we then run the retrofit request.
executor.executeNextInQueue();
verify(subscriber, times(1)).onNext(response);
}
@Test public void testObservableDoesNotLockExecutor() {
TestScheduler test = Schedulers.test();
rxSupport.createRequestObservable(callable)
.delay(1000, TimeUnit.MILLISECONDS, test)
.subscribe(subscriber);
rxSupport.createRequestObservable(callable)
.delay(2000, TimeUnit.MILLISECONDS, test)
.subscribe(subscriber);
// Nothing fired yet
verify(subscriber, never()).onNext(any());
// Subscriptions should of been queued up and executed even tho we delayed on the Subscriber.
executor.executeNextInQueue();
executor.executeNextInQueue();
verify(subscriber, never()).onNext(response);
test.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
verify(subscriber, times(1)).onNext(response);
test.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
verify(subscriber, times(2)).onNext(response);
}
@Test public void testObservableRespectsObserveOn() throws Exception {
TestScheduler observe = Schedulers.test();
rxSupport.createRequestObservable(callable)
.observeOn(observe)
.subscribe(subscriber);
verify(subscriber, never()).onNext(any());
executor.executeNextInQueue();
// Should have no response yet, but callback should of been executed.
verify(subscriber, never()).onNext(any());
verify(callable, times(1)).call();
// Forward the Observable Scheduler
observe.triggerActions();
verify(subscriber, times(1)).onNext(response);
}
/**
* Test Executor to iterate through Executions to aid in checking
* that the Observable implementation is correct.
*/
static class QueuedSynchronousExecutor implements Executor {
Deque<Runnable> runnableQueue = new ArrayDeque<Runnable>();
@Override public void execute(Runnable runnable) {
runnableQueue.add(runnable);
}
/**
* Will throw exception if you are expecting something to be added to the Executor
* and it hasn't.
*/
void executeNextInQueue() {
runnableQueue.removeFirst().run();
}
/**
* Executes any queued executions on the executor.
*/
void executeAll() {
Iterator<Runnable> iterator = runnableQueue.iterator();
while (iterator.hasNext()) {
Runnable next = iterator.next();
next.run();
iterator.remove();
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册