提交 e9035c99 编写于 作者: A Allen Wang

Added streaming support for Observable.

上级 4b86cd5e
package com.netflix.client;
import java.util.concurrent.Future;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
......@@ -8,11 +10,29 @@ import rx.Observable.OnSubscribeFunc;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
public class ObservableAsyncClient<T extends ClientRequest, S extends ResponseWithTypedEntity> {
public class ObservableAsyncClient<T extends ClientRequest, S extends IResponse, U> {
public static class StreamEvent<U extends IResponse, E> {
private volatile U response;
private volatile E event;
private final AsyncClient<T, S, ?> client;
public StreamEvent(U response, E event) {
super();
this.response = response;
this.event = event;
}
public final U getResponse() {
return response;
}
public final E getEvent() {
return event;
}
}
private final AsyncClient<T, S, U> client;
public ObservableAsyncClient(AsyncClient<T, S, ?> client) {
public ObservableAsyncClient(AsyncClient<T, S, U> client) {
this.client = client;
}
......@@ -56,4 +76,60 @@ public class ObservableAsyncClient<T extends ClientRequest, S extends ResponseWi
}
});
}
public <E> Observable<StreamEvent<S, E>> stream(final T request, final StreamDecoder<E, U> decoder) {
final OnSubscribeFunc<StreamEvent<S, E>> onSubscribeFunc = new OnSubscribeFunc<StreamEvent<S, E>>() {
@Override
public Subscription onSubscribe(final
Observer<? super StreamEvent<S, E>> observer) {
final CompositeSubscription parentSubscription = new CompositeSubscription();
try {
Future<?> future = client.execute(request, (StreamDecoder<E, U>) decoder,
new ResponseCallback<S, E>() {
private volatile S response;
@Override
public void completed(S response) {
observer.onCompleted();
}
@Override
public void failed(Throwable e) {
observer.onError(e);
}
@Override
public void cancelled() {
observer.onError(new IllegalStateException("operation cancelled"));
}
@Override
public void responseReceived(S response) {
this.response = response;
}
@Override
public void contentReceived(E content) {
StreamEvent<S, E> e = new StreamEvent<S, E>(this.response, content);
observer.onNext(e);
}
}
);
parentSubscription.add(Subscriptions.from(future));
} catch (ClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return parentSubscription;
}
};
return Observable.create(new OnSubscribeFunc<StreamEvent<S, E>>() {
@Override
public Subscription onSubscribe(final Observer<? super StreamEvent<S, E>> observer) {
return onSubscribeFunc.onSubscribe(observer);
}
});
}
}
......@@ -32,6 +32,7 @@ import com.netflix.client.AsyncLoadBalancingClient;
import com.netflix.client.ClientException;
import com.netflix.client.FullResponseCallback;
import com.netflix.client.ObservableAsyncClient;
import com.netflix.client.ObservableAsyncClient.StreamEvent;
import com.netflix.client.ResponseCallback;
import com.netflix.client.StreamDecoder;
import com.netflix.client.config.CommonClientConfigKey;
......@@ -187,7 +188,7 @@ public class HttpAsyncClienTest {
public void testObservable() throws Exception {
URI uri = new URI(SERVICE_URI + "testNetty/person");
HttpRequest request = HttpRequest.newBuilder().uri(uri).build();
ObservableAsyncClient<HttpRequest, HttpResponse> observableClient = new ObservableAsyncClient<HttpRequest, HttpResponse>(client);
ObservableAsyncClient<HttpRequest, HttpResponse, ByteBuffer> observableClient = new ObservableAsyncClient<HttpRequest, HttpResponse, ByteBuffer>(client);
final List<Person> result = Lists.newArrayList();
observableClient.execute(request).toBlockingObservable().forEach(new Action1<HttpResponse>() {
@Override
......@@ -532,6 +533,25 @@ public class HttpAsyncClienTest {
assertEquals(EmbeddedResources.streamContent, results);
}
@Test
public void testStreamObservable() throws Exception {
HttpRequest request = HttpRequest.newBuilder().uri(SERVICE_URI + "testNetty/stream").build();
final List<String> results = Lists.newArrayList();
ObservableAsyncClient<HttpRequest, HttpResponse, ByteBuffer> observableClient =
new ObservableAsyncClient<HttpRequest, HttpResponse, ByteBuffer>(client);
observableClient.stream(request, new SSEDecoder())
.toBlockingObservable()
.forEach(new Action1<StreamEvent<HttpResponse, List<String>>>() {
@Override
public void call(final StreamEvent<HttpResponse, List<String>> t1) {
results.addAll(t1.getEvent());
}
});
assertEquals(EmbeddedResources.streamContent, results);
}
@Test
public void testStreamWithLoadBalancer() throws Exception {
AsyncLoadBalancingClient<HttpRequest, HttpResponse, ByteBuffer> loadBalancingClient = new AsyncLoadBalancingClient<HttpRequest,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册