提交 27572712 编写于 作者: A Allen Wang

Added Netty examples.

上级 3423d0a6
......@@ -736,15 +736,17 @@ public class DefaultClientConfigImpl implements IClientConfig {
return propertyNameSpace;
}
public static DefaultClientConfigImpl getEmptyConfig() {
return new DefaultClientConfigImpl();
}
public static DefaultClientConfigImpl getClientConfigWithDefaultValues(String clientName) {
return getClientConfigWithDefaultValues(clientName, DEFAULT_PROPERTY_NAME_SPACE);
}
public static DefaultClientConfigImpl getClientConfigWithDefaultValues() {
DefaultClientConfigImpl config = new DefaultClientConfigImpl();
config.loadDefaultValues();
return config;
}
return getClientConfigWithDefaultValues("default", DEFAULT_PROPERTY_NAME_SPACE);
}
public static DefaultClientConfigImpl getClientConfigWithDefaultValues(String clientName, String nameSpace) {
......
......@@ -39,6 +39,7 @@ public abstract class ExampleAppWithLocalResource {
public abstract void run() throws Exception;
@edu.umd.cs.findbugs.annotations.SuppressWarnings
public final void runApp() throws Exception {
PackagesResourceConfig resourceConfig = new PackagesResourceConfig("com.netflix.ribbon.examples.server");
ExecutorService service = Executors.newFixedThreadPool(50);
......@@ -54,5 +55,6 @@ public abstract class ExampleAppWithLocalResource {
}
service.shutdownNow();
}
System.exit(0);
}
}
......@@ -39,10 +39,10 @@ public class AsyncStreamingClientApp extends ExampleAppWithLocalResource {
@Override
public void run() throws Exception {
HttpRequest request = HttpRequest.newBuilder().uri(SERVICE_URI + "testAsync/stream").build();
HttpRequest request = HttpRequest.newBuilder().uri(SERVICE_URI + "testAsync/customEvent").build();
AsyncHttpClient<ByteBuffer> client = AsyncHttpClientBuilder.withApacheAsyncClient().buildClient();
try {
Future<HttpResponse> response = client.execute(request, new SSEDecoder(), new ResponseCallback<HttpResponse, String>() {
Future<HttpResponse> response = client.execute(request, new LineEventDecoder(), new ResponseCallback<HttpResponse, String>() {
@Override
public void completed(HttpResponse response) {
}
......
......@@ -24,14 +24,14 @@ import com.netflix.client.StreamDecoder;
/**
* A {@link StreamDecoder} used by some sample application. This decoder decodes
* content of ByteBuffer into list of Server-Sent Event string.
* content of ByteBuffer into list of single line event string.
* <p>
* This code is copied from https://github.com/Netflix/RxJava/tree/master/rxjava-contrib/rxjava-apache-http
*
* @author awang
*
*/
public class SSEDecoder implements StreamDecoder<String, ByteBuffer> {
public class LineEventDecoder implements StreamDecoder<String, ByteBuffer> {
public String decode(ByteBuffer input) throws IOException {
if (input == null || !input.hasRemaining()) {
......
......@@ -40,13 +40,13 @@ public class StreamingObservableExample extends ExampleAppWithLocalResource {
@Override
public void run() throws Exception {
HttpRequest request = HttpRequest.newBuilder().uri(SERVICE_URI + "testAsync/stream").build();
HttpRequest request = HttpRequest.newBuilder().uri(SERVICE_URI + "testAsync/customEvent").build();
ObservableAsyncClient<HttpRequest, HttpResponse, ByteBuffer> observableClient =
AsyncHttpClientBuilder.withApacheAsyncClient().observableClient();
final AtomicReference<HttpResponse> httpResponse = new AtomicReference<HttpResponse>();
final AtomicInteger counter = new AtomicInteger();
try {
observableClient.stream(request, new SSEDecoder())
observableClient.stream(request, new LineEventDecoder())
.toBlockingObservable()
.forEach(new Action1<StreamEvent<HttpResponse, String>>() {
@Override
......
package com.netflix.ribbon.examples.netty.http;
import java.net.URI;
import java.util.Map;
import rx.util.functions.Action1;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.config.IClientConfigKey;
import com.netflix.client.http.HttpRequest;
import com.netflix.client.http.UnexpectedHttpResponseException;
import com.netflix.client.netty.http.NettyHttpClient;
import com.netflix.client.netty.http.NettyHttpClientBuilder;
import com.netflix.ribbon.examples.ExampleAppWithLocalResource;
import com.netflix.ribbon.examples.server.ServerResources.Person;
import com.netflix.serialization.TypeDef;
public class EntityDeserializationExample extends ExampleAppWithLocalResource {
@Override
public void run() throws Exception {
URI uri = new URI(SERVICE_URI + "testAsync/person");
HttpRequest request = HttpRequest.newBuilder().uri(uri).build();
NettyHttpClient observableClient = NettyHttpClientBuilder.newBuilder()
.build();
// deserialize using the default Jackson deserializer
observableClient.createEntityObservable(request, TypeDef.fromClass(Person.class)).toBlockingObservable().forEach(new Action1<Person>() {
@Override
public void call(Person t1) {
try {
System.out.println("Person: " + t1);
} catch (Exception e) {
e.printStackTrace();
}
}
});
// deserialize as Map using the default Jackson deserializer
observableClient.createEntityObservable(request, new TypeDef<Map<String, Object>>(){})
.toBlockingObservable()
.forEach(new Action1<Map<String, Object>>() {
@Override
public void call(Map<String, Object> t1) {
try {
System.out.println("Map: " + t1);
} catch (Exception e) {
e.printStackTrace();
}
}
});
// deserialize using Xml deserializer
IClientConfig requestConfig = DefaultClientConfigImpl.getEmptyConfig()
.setPropertyWithType(IClientConfigKey.CommonKeys.Deserializer, new XmlCodec<Person>());
request = HttpRequest.newBuilder().uri(SERVICE_URI + "testAsync/getXml").build();
observableClient.createEntityObservable(request, TypeDef.fromClass(Person.class), requestConfig)
.toBlockingObservable()
.forEach(new Action1<Person>() {
@Override
public void call(Person t1) {
try {
System.out.println("Person: " + t1);
} catch (Exception e) {
e.printStackTrace();
}
}
});
// URI does not exist, will get UnexpectedResponseException
request = HttpRequest.newBuilder().uri(SERVICE_URI + "testAsync/NotFound").build();
observableClient.createEntityObservable(request, TypeDef.fromClass(Person.class))
.subscribe(new Action1<Person>() {
@Override
public void call(Person t1) {
try {
System.out.println("Person: " + t1);
} catch (Exception e) {
e.printStackTrace();
}
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable t1) {
if (t1 instanceof UnexpectedHttpResponseException) {
UnexpectedHttpResponseException ex = (UnexpectedHttpResponseException) t1;
System.out.println(ex.getStatusCode());
System.out.println(ex.getResponse().getStatusLine());
}
}
});
Thread.sleep(2000);
}
public static void main(String[] args) throws Exception {
new EntityDeserializationExample().runApp();
}
}
package com.netflix.ribbon.examples.netty.http;
import rx.util.functions.Action1;
import com.netflix.client.http.HttpRequest;
import com.netflix.client.http.HttpResponse;
import com.netflix.client.netty.http.NettyHttpClient;
import com.netflix.client.netty.http.NettyHttpClientBuilder;
import com.netflix.ribbon.examples.ExampleAppWithLocalResource;
import com.netflix.ribbon.examples.server.ServerResources.Person;
import com.netflix.serialization.TypeDef;
public class HttpResponseDeserialization extends ExampleAppWithLocalResource {
@Override
public void run() throws Exception {
HttpRequest request = HttpRequest.newBuilder().uri(SERVICE_URI + "testAsync/getXml").build();
NettyHttpClient observableClient = NettyHttpClientBuilder.newBuilder()
.build();
observableClient.createFullHttpResponseObservable(request)
.toBlockingObservable()
.forEach(new Action1<HttpResponse>() {
@Override
public void call(HttpResponse t1) {
try {
System.out.println(t1.getStatus());
System.out.println(t1.getEntity(TypeDef.fromClass(Person.class), new XmlCodec<Person>()));
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
public static void main(String[] args) throws Exception {
new HttpResponseDeserialization().runApp();
}
}
package com.netflix.ribbon.examples.netty.http;
import java.util.concurrent.CountDownLatch;
import rx.Observer;
import com.google.common.collect.Lists;
import com.netflix.client.http.HttpRequest;
import com.netflix.client.http.HttpResponse;
import com.netflix.client.netty.http.NettyHttpClientBuilder.NettyHttpLoadBalancingClientBuilder;
import com.netflix.client.netty.http.NettyHttpLoadBalancingClient;
import com.netflix.loadbalancer.AbstractLoadBalancer;
import com.netflix.loadbalancer.BaseLoadBalancer;
import com.netflix.loadbalancer.Server;
public class LoadBalancingExample {
public static void main(String[] args) throws Exception {
NettyHttpLoadBalancingClient client = NettyHttpLoadBalancingClientBuilder.newBuilder()
.withLoadBalancer(new BaseLoadBalancer())
.withInitialServerList(Lists.newArrayList(new Server("www.google.com:80"), new Server("www.microsoft.com:80"), new Server("www.yahoo.com:80")))
.build();
HttpRequest request = HttpRequest.newBuilder().uri("/").build();
final CountDownLatch latch = new CountDownLatch(3);
Observer<HttpResponse> observer = new Observer<HttpResponse>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(HttpResponse args) {
System.out.println("Got response from: " + args.getRequestedURI());
}
};
for (int i = 0; i < 3; i++) {
client.observeHttpResponse(request, observer);
}
latch.await();
System.out.println(((AbstractLoadBalancer) client.getLoadBalancer()).getLoadBalancerStats());
}
}
package com.netflix.ribbon.examples.netty.http;
import java.net.URI;
import rx.util.functions.Action1;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfigKey;
import com.netflix.client.http.HttpRequest;
import com.netflix.client.http.HttpRequest.Verb;
import com.netflix.client.netty.http.NettyHttpClient;
import com.netflix.client.netty.http.NettyHttpClientBuilder;
import com.netflix.ribbon.examples.ExampleAppWithLocalResource;
import com.netflix.ribbon.examples.server.ServerResources.Person;
import com.netflix.serialization.TypeDef;
public class PostExample extends ExampleAppWithLocalResource {
@Override
public void run() throws Exception {
URI uri = new URI(SERVICE_URI + "testAsync/person");
Person myPerson = new Person("netty", 5);
HttpRequest request = HttpRequest.newBuilder().uri(uri).verb(Verb.POST).entity(myPerson).header("Content-type", "application/json").build();
NettyHttpClient observableClient = NettyHttpClientBuilder.newBuilder().withClientConfig(DefaultClientConfigImpl.getClientConfigWithDefaultValues()
.setPropertyWithType(IClientConfigKey.CommonKeys.ReadTimeout, 10000)
.setPropertyWithType(IClientConfigKey.CommonKeys.ConnectTimeout, 2000))
.build();
observableClient.createEntityObservable(request, TypeDef.fromClass(Person.class)).toBlockingObservable().forEach(new Action1<Person>() {
@Override
public void call(Person t1) {
try {
System.out.println(t1);
} catch (Exception e) { // NOPMD
}
}
});
}
public static void main(String[] args) throws Exception {
new PostExample().runApp();
}
}
package com.netflix.ribbon.examples.netty.http;
import io.reactivex.netty.protocol.http.ObservableHttpResponse;
import io.reactivex.netty.protocol.text.sse.SSEEvent;
import java.util.List;
import rx.Observable;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
import com.google.common.collect.Lists;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.http.HttpRequest;
import com.netflix.client.netty.http.NettyHttpClient;
import com.netflix.client.netty.http.NettyHttpClientBuilder;
import com.netflix.client.netty.http.ServerSentEvent;
import com.netflix.ribbon.examples.ExampleAppWithLocalResource;
import com.netflix.ribbon.examples.server.ServerResources.Person;
import com.netflix.serialization.JacksonCodec;
import com.netflix.serialization.TypeDef;
public class SeverSentEventExample extends ExampleAppWithLocalResource {
@Override
public void run() throws Exception {
// Get the events and parse each data line using Jackson deserializer
IClientConfig overrideConfig = new DefaultClientConfigImpl().setPropertyWithType(CommonClientConfigKey.Deserializer, JacksonCodec.getInstance());
HttpRequest request = HttpRequest.newBuilder().uri(SERVICE_URI + "testAsync/personStream")
.build();
NettyHttpClient observableClient = NettyHttpClientBuilder.newBuilder().build();
final List<Person> result = Lists.newArrayList();
observableClient.createServerSentEventEntityObservable(request, TypeDef.fromClass(Person.class), overrideConfig)
.subscribe(new Action1<ServerSentEvent<Person>>() {
@Override
public void call(ServerSentEvent<Person> t1) {
// System.out.println(t1);
result.add(t1.getEntity());
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable t1) {
t1.printStackTrace();
}
});
Thread.sleep(2000);
System.out.println(result);
// Get the events as raw string
request = HttpRequest.newBuilder().uri(SERVICE_URI + "testAsync/stream")
.build();
observableClient.createServerSentEventObservable(request)
.flatMap(new Func1<ObservableHttpResponse<SSEEvent>, Observable<SSEEvent>>() {
@Override
public Observable<SSEEvent> call(
ObservableHttpResponse<SSEEvent> t1) {
return t1.content();
}
}).toBlockingObservable()
.forEach(new Action1<SSEEvent>(){
@Override
public void call(SSEEvent t1) {
System.out.println(t1);
}
});
}
public static void main(String[] args) throws Exception {
new SeverSentEventExample().runApp();
}
}
package com.netflix.ribbon.examples.netty.http;
import rx.util.functions.Action1;
import com.netflix.client.http.HttpRequest;
import com.netflix.client.http.HttpResponse;
import com.netflix.client.netty.http.NettyHttpClient;
import com.netflix.client.netty.http.NettyHttpClientBuilder.NettyHttpLoadBalancingClientBuilder;
import com.netflix.serialization.StringDeserializer;
import com.netflix.serialization.TypeDef;
public class SimpleGet {
public static void main(String[] args) {
NettyHttpClient client = NettyHttpLoadBalancingClientBuilder.newBuilder()
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri("http://www.google.com:80")
.build();
client.createFullHttpResponseObservable(request)
.toBlockingObservable()
.forEach(new Action1<HttpResponse>() {
@Override
public void call(HttpResponse t1) {
System.out.println("Status code: " + t1.getStatus());
try {
System.out.println("Response content: " + t1.getEntity(TypeDef.fromClass(String.class), StringDeserializer.getInstance()));
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
package com.netflix.ribbon.examples.netty.http;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import com.netflix.serialization.Deserializer;
import com.netflix.serialization.Serializer;
import com.netflix.serialization.TypeDef;
import com.thoughtworks.xstream.XStream;
public class XmlCodec<T extends Object> implements Serializer<T>, Deserializer<T> {
XStream xstream = new XStream();
@SuppressWarnings("unchecked")
@Override
public T deserialize(InputStream in, TypeDef<T> type)
throws IOException {
System.out.println("Deserializing using XStream");
return (T) xstream.fromXML(in);
}
@Override
public void serialize(OutputStream out, T object, TypeDef<?> type) throws IOException {
xstream.toXML(object, out);
}
}
......@@ -164,6 +164,42 @@ public class ServerResources {
};
}
@GET
@Path("/personStream")
@Produces("text/event-stream")
public StreamingOutput getPersonStream() {
return new StreamingOutput() {
@Override
public void write(OutputStream output) throws IOException,
WebApplicationException {
for (Person p: persons) {
String eventLine = "data: " + mapper.writeValueAsString(p)
+ "\n\n";
output.write(eventLine.getBytes("UTF-8"));
}
output.close();
}
};
}
@GET
@Path("/customEvent")
public StreamingOutput getCustomeEvents() {
return new StreamingOutput() {
@Override
public void write(OutputStream output) throws IOException,
WebApplicationException {
for (String line: streamContent) {
String eventLine = line + "\n";
output.write(eventLine.getBytes("UTF-8"));
}
output.close();
}
};
}
@GET
@Path("/getXml")
@Produces("application/xml")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册