From e9035c99a41a1f60117853be4adbe2367d3a9182 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Sat, 5 Oct 2013 11:48:32 -0700 Subject: [PATCH] Added streaming support for Observable. --- .../netflix/client/ObservableAsyncClient.java | 82 ++++++++++++++++++- .../httpasyncclient/HttpAsyncClienTest.java | 22 ++++- 2 files changed, 100 insertions(+), 4 deletions(-) diff --git a/ribbon-core/src/main/java/com/netflix/client/ObservableAsyncClient.java b/ribbon-core/src/main/java/com/netflix/client/ObservableAsyncClient.java index 194d130..c5f3893 100644 --- a/ribbon-core/src/main/java/com/netflix/client/ObservableAsyncClient.java +++ b/ribbon-core/src/main/java/com/netflix/client/ObservableAsyncClient.java @@ -1,6 +1,8 @@ package com.netflix.client; +import java.util.concurrent.Future; + import rx.Observable; import rx.Observer; import rx.Subscription; @@ -8,11 +10,29 @@ import rx.Observable.OnSubscribeFunc; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; -public class ObservableAsyncClient { +public class ObservableAsyncClient { + + public static class StreamEvent { + private volatile U response; + private volatile E event; - private final AsyncClient client; + public StreamEvent(U response, E event) { + super(); + this.response = response; + this.event = event; + } + + public final U getResponse() { + return response; + } + public final E getEvent() { + return event; + } + } + + private final AsyncClient client; - public ObservableAsyncClient(AsyncClient client) { + public ObservableAsyncClient(AsyncClient client) { this.client = client; } @@ -56,4 +76,60 @@ public class ObservableAsyncClient Observable> stream(final T request, final StreamDecoder decoder) { + final OnSubscribeFunc> onSubscribeFunc = new OnSubscribeFunc>() { + @Override + public Subscription onSubscribe(final + Observer> observer) { + final CompositeSubscription parentSubscription = new CompositeSubscription(); + try { + Future future = client.execute(request, (StreamDecoder) decoder, + new ResponseCallback() { + private volatile S response; + @Override + public void completed(S response) { + observer.onCompleted(); + } + + @Override + public void failed(Throwable e) { + observer.onError(e); + } + + @Override + public void cancelled() { + observer.onError(new IllegalStateException("operation cancelled")); + } + + @Override + public void responseReceived(S response) { + this.response = response; + } + + @Override + public void contentReceived(E content) { + StreamEvent e = new StreamEvent(this.response, content); + observer.onNext(e); + } + } + ); + parentSubscription.add(Subscriptions.from(future)); + + } catch (ClientException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return parentSubscription; + } + + }; + return Observable.create(new OnSubscribeFunc>() { + @Override + public Subscription onSubscribe(final Observer> observer) { + return onSubscribeFunc.onSubscribe(observer); + } + }); + } + } diff --git a/ribbon-httpasyncclient/src/test/java/com/netflix/httpasyncclient/HttpAsyncClienTest.java b/ribbon-httpasyncclient/src/test/java/com/netflix/httpasyncclient/HttpAsyncClienTest.java index 8c4a86f..d864f27 100644 --- a/ribbon-httpasyncclient/src/test/java/com/netflix/httpasyncclient/HttpAsyncClienTest.java +++ b/ribbon-httpasyncclient/src/test/java/com/netflix/httpasyncclient/HttpAsyncClienTest.java @@ -32,6 +32,7 @@ import com.netflix.client.AsyncLoadBalancingClient; import com.netflix.client.ClientException; import com.netflix.client.FullResponseCallback; import com.netflix.client.ObservableAsyncClient; +import com.netflix.client.ObservableAsyncClient.StreamEvent; import com.netflix.client.ResponseCallback; import com.netflix.client.StreamDecoder; import com.netflix.client.config.CommonClientConfigKey; @@ -187,7 +188,7 @@ public class HttpAsyncClienTest { public void testObservable() throws Exception { URI uri = new URI(SERVICE_URI + "testNetty/person"); HttpRequest request = HttpRequest.newBuilder().uri(uri).build(); - ObservableAsyncClient observableClient = new ObservableAsyncClient(client); + ObservableAsyncClient observableClient = new ObservableAsyncClient(client); final List result = Lists.newArrayList(); observableClient.execute(request).toBlockingObservable().forEach(new Action1() { @Override @@ -532,6 +533,25 @@ public class HttpAsyncClienTest { assertEquals(EmbeddedResources.streamContent, results); } + @Test + public void testStreamObservable() throws Exception { + HttpRequest request = HttpRequest.newBuilder().uri(SERVICE_URI + "testNetty/stream").build(); + final List results = Lists.newArrayList(); + ObservableAsyncClient observableClient = + new ObservableAsyncClient(client); + observableClient.stream(request, new SSEDecoder()) + .toBlockingObservable() + .forEach(new Action1>>() { + + @Override + public void call(final StreamEvent> t1) { + results.addAll(t1.getEvent()); + } + }); + assertEquals(EmbeddedResources.streamContent, results); + } + + @Test public void testStreamWithLoadBalancer() throws Exception { AsyncLoadBalancingClient loadBalancingClient = new AsyncLoadBalancingClient