提交 8c608ccd 编写于 作者: A Allen Wang

Implemented redirect using Observable APIs. Added JUnit tests. Retrofit...

Implemented redirect using Observable APIs. Added JUnit tests. Retrofit serialization framework to RestClient.
上级 1d2cc72c
......@@ -41,7 +41,7 @@ subprojects {
project(':ribbon-core') {
dependencies {
compile 'com.netflix.netflix-commons:netflix-statistics:0.1.1'
compile 'com.netflix.rxjava:rxjava-core:0.14.2'
compile 'com.netflix.rxjava:rxjava-core:0.16.1'
}
}
......@@ -65,6 +65,7 @@ project(':ribbon-rxnetty') {
dependencies {
compile project(':ribbon-core')
compile 'com.netflix.rxnetty:rx-netty:0.2.1'
testCompile 'com.google.mockwebserver:mockwebserver:20130706'
testCompile project(':ribbon-test')
}
}
......
......@@ -275,7 +275,49 @@ public abstract class LoadBalancerContext<T extends ClientRequest, S extends IRe
logger.error("Unexpected exception", ex);
}
}
/**
* This is called after an error is thrown from the client
* to update related stats.
*/
protected void noteError(ServerStats stats, ClientRequest request, Throwable e, long responseTime) {
try {
recordStats(stats, responseTime);
LoadBalancerErrorHandler<? super T, ? super S> errorHandler = getErrorHandler();
if (errorHandler != null && e != null) {
if (errorHandler.isCircuitTrippingException(e)) {
stats.incrementSuccessiveConnectionFailureCount();
stats.addToFailureCount();
} else {
stats.clearSuccessiveConnectionFailureCount();
}
}
} catch (Throwable ex) {
logger.error("Unexpected exception", ex);
}
}
/**
* This is called after a response is received from the client
* to update related stats.
*/
protected void noteResponse(ServerStats stats, ClientRequest request, Object response, long responseTime) {
try {
recordStats(stats, responseTime);
LoadBalancerErrorHandler<? super T, ? super S> errorHandler = getErrorHandler();
if (errorHandler != null && response != null) {
if (errorHandler.isCircuitTrippinResponse(response)) {
stats.incrementSuccessiveConnectionFailureCount();
stats.addToFailureCount();
} else {
stats.clearSuccessiveConnectionFailureCount();
}
}
} catch (Throwable ex) {
logger.error("Unexpected exception", ex);
}
}
/**
* This is usually called just before client execute a request.
*/
......
......@@ -792,7 +792,7 @@ public class DefaultClientConfigImpl implements IClientConfig {
@SuppressWarnings("unchecked")
@Override
public <T> T getTypedProperty(IClientConfigKey<T> key) {
public <T> T getPropertyWithType(IClientConfigKey<T> key) {
Object obj = properties.get(key.key());
Class<T> type = key.type();
try {
......@@ -800,14 +800,17 @@ public class DefaultClientConfigImpl implements IClientConfig {
} catch (ClassCastException e) {
if (obj instanceof String) {
String stringValue = (String) obj;
if (Integer.class.isAssignableFrom(type)) {
if (Integer.class.equals(type)) {
return (T) Integer.valueOf(stringValue);
} else if (Boolean.class.isAssignableFrom(type)) {
} else if (Boolean.class.equals(type)) {
return (T) Boolean.valueOf(stringValue);
} else if (Float.class.isAssignableFrom(type)) {
} else if (Float.class.equals(type)) {
return (T) Float.valueOf(stringValue);
} else if (Long.class.isAssignableFrom(type)) {
} else if (Long.class.equals(type)) {
return (T) Long.valueOf(stringValue);
} else if (Double.class.equals(type)) {
return (T) Double.valueOf(stringValue);
}
throw new IllegalArgumentException("Unable to convert string value to desired type " + type);
} else {
......@@ -817,8 +820,17 @@ public class DefaultClientConfigImpl implements IClientConfig {
}
@Override
public <T> IClientConfig setTypedProperty(IClientConfigKey<T> key, T value) {
public <T> IClientConfig setPropertyWithType(IClientConfigKey<T> key, T value) {
properties.put(key.key(), value);
return this;
}
@Override
public <T> T getPropertyWithType(IClientConfigKey<T> key, T defaultValue) {
T value = getPropertyWithType(key);
if (value == null) {
value = defaultValue;
}
return value;
}
}
......@@ -40,8 +40,16 @@ public interface IClientConfig {
public Map<String, Object> getProperties();
/**
* @deprecated use {@link #setPropertyWithType(IClientConfigKey, Object)}
*/
@Deprecated
public void setProperty(IClientConfigKey key, Object value);
/**
* @deprecated use {@link #getPropertyWithType(IClientConfigKey)}
*/
@Deprecated
public Object getProperty(IClientConfigKey key);
public Object getProperty(IClientConfigKey key, Object defaultVal);
......@@ -60,11 +68,17 @@ public interface IClientConfig {
public boolean getPropertyAsBoolean(IClientConfigKey key, boolean defaultValue);
/**
* Returns a typed property. This property must be set by {@link #setTypedProperty(IClientConfigKey, Object)}.
* If the property of IClientConfigKey is not set, or is set by {@link #loadProperties(String)} or {@link #setProperty(IClientConfigKey, Object)},
* this will return null.
* Returns a typed property. This property must be set by {@link #setPropertyWithType(IClientConfigKey, Object)}.
* If the property of IClientConfigKey is not set, it returns null.
*/
public <T> T getTypedProperty(IClientConfigKey<T> key);
public <T> T getPropertyWithType(IClientConfigKey<T> key);
/**
* Returns a typed property. This property must be set by {@link #setPropertyWithType(IClientConfigKey, Object)}.
* If the property of IClientConfigKey is not set, it returns the default value passed in as the parameter.
*/
public <T> T getPropertyWithType(IClientConfigKey<T> key, T defaultValue);
public <T> IClientConfig setTypedProperty(IClientConfigKey<T> key, T value);
public <T> IClientConfig setPropertyWithType(IClientConfigKey<T> key, T value);
}
......@@ -70,6 +70,13 @@ public class HttpRequest extends ClientRequest {
private HttpRequest request = new HttpRequest();
public Builder() {
}
public Builder(HttpRequest request) {
this.request = request;
}
public Builder uri(URI uri) {
request.setUri(uri);
return this;
......@@ -109,10 +116,20 @@ public class HttpRequest extends ClientRequest {
return this;
}
/**
* @deprecated see {@link #queryParam(String, String)}
*/
@Deprecated
public Builder queryParams(String name, String value) {
request.queryParams.put(name, value);
return this;
}
public Builder queryParam(String name, String value) {
request.queryParams.put(name, value);
return this;
}
public Builder entity(Object entity, TypeDef<?> entityType) {
request.entity = entity;
......@@ -186,6 +203,11 @@ public class HttpRequest extends ClientRequest {
public static Builder newBuilder() {
return new Builder();
}
public static Builder newBuilder(HttpRequest toCopy) {
return new Builder(toCopy);
}
/**
* Return a new instance of HttpRequest replacing the URI.
......
......@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Map;
import com.netflix.client.ResponseWithTypedEntity;
import com.netflix.serialization.SerializationUtils;
import com.netflix.serialization.TypeDef;
/**
......@@ -37,8 +38,14 @@ public interface HttpResponse extends ResponseWithTypedEntity, Closeable {
*/
public int getStatus();
/**
* Get the reason phrase of HTTP status
*/
public String getStatusLine();
/**
* @see #getHttpHeaders()
*/
@Override
@Deprecated
public Map<String, Collection<String>> getHeaders();
......@@ -49,9 +56,15 @@ public interface HttpResponse extends ResponseWithTypedEntity, Closeable {
public InputStream getInputStream();
/**
* @deprecated See APIs in {@link SerializationUtils} to deserialize HTTP content
*/
@Deprecated
public <T> T getEntity(Class<T> type) throws Exception;
/**
* @deprecated See APIs in {@link SerializationUtils} to deserialize HTTP content
*/
@Deprecated
public <T> T getEntity(TypeDef<T> type) throws Exception;
}
......@@ -19,12 +19,10 @@ package com.netflix.serialization;
public class JacksonSerializationFactory implements SerializationFactory<HttpSerializationContext>{
public static final JacksonCodec instance = JacksonCodec.getInstance();
@Override
public <T extends Object> Deserializer<T> getDeserializer(HttpSerializationContext key, TypeDef<T> typeDef) {
if (key.getContentType().equalsIgnoreCase("application/json")) {
return instance;
return JacksonCodec.getInstance();
}
return null;
}
......@@ -32,7 +30,7 @@ public class JacksonSerializationFactory implements SerializationFactory<HttpSer
@Override
public <T> Serializer<T> getSerializer(HttpSerializationContext key, TypeDef<T> typeDef) {
if (key.getContentType().equalsIgnoreCase("application/json")) {
return instance;
return JacksonCodec.getInstance();
}
return null;
}
......
package com.netflix.serialization;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import com.netflix.client.config.CommonClientConfigKey;
......@@ -16,19 +17,29 @@ public class SerializationUtils {
return deserializer.deserialize(in, typeDef);
}
public static <T> String serializeToString(Serializer<T> serializer, T obj, TypeDef<?> typeDef)
throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
serializer.serialize(out, obj, typeDef);
return new String(out.toByteArray(), "UTF-8");
}
public static <T> T getEntity(HttpResponse response, TypeDef<T> type, Deserializer<T> deserializer) throws IOException {
return deserializer.deserialize(response.getInputStream(), type);
}
public static <T> Deserializer<T> getDeserializer(HttpRequest request, HttpHeaders responseHeaders, TypeDef<T> typeDef, SerializationFactory<HttpSerializationContext> serializationFactory) {
public static <T> Deserializer<T> getDeserializer(HttpRequest request, HttpHeaders responseHeaders, TypeDef<T> typeDef,
SerializationFactory<HttpSerializationContext> serializationFactory) {
Deserializer<T> deserializer = null;
if (request.getOverrideConfig() != null) {
deserializer = request.getOverrideConfig().getTypedProperty(CommonClientConfigKey.Deserializer);
deserializer = (Deserializer<T>) request.getOverrideConfig().getPropertyWithType(CommonClientConfigKey.Deserializer);
}
if (deserializer == null && serializationFactory != null) {
deserializer = serializationFactory.getDeserializer(new HttpSerializationContext(responseHeaders, request.getUri()), typeDef);
}
if (deserializer == null && typeDef.getRawType().equals(String.class)) {
return (Deserializer<T>) StringDeserializer.getInstance();
}
return deserializer;
}
}
......@@ -9,6 +9,16 @@ import com.google.common.io.CharStreams;
import com.google.common.io.Closeables;
public class StringDeserializer implements Deserializer<String> {
private static final StringDeserializer instance = new StringDeserializer();
private StringDeserializer() {
}
public static final StringDeserializer getInstance() {
return instance;
}
@Override
public String deserialize(InputStream in, TypeDef<String> type)
throws IOException {
......
......@@ -22,9 +22,9 @@ public class DefaultClientConfigImplTest {
DefaultClientConfigImpl config = new DefaultClientConfigImpl();
config.loadProperties("myclient");
assertEquals("1000", config.getProperty(CommonClientConfigKey.ConnectTimeout));
assertEquals(1000, config.getTypedProperty(CommonClientConfigKey.ConnectTimeout).intValue());
config.setTypedProperty(CommonClientConfigKey.ConnectTimeout, 2000);
assertEquals(2000, config.getTypedProperty(CommonClientConfigKey.ConnectTimeout).intValue());
assertEquals(1000, config.getPropertyWithType(CommonClientConfigKey.ConnectTimeout).intValue());
config.setPropertyWithType(CommonClientConfigKey.ConnectTimeout, 2000);
assertEquals(2000, config.getPropertyWithType(CommonClientConfigKey.ConnectTimeout).intValue());
}
@Test
......
......@@ -31,10 +31,16 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.netflix.client.ClientException;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.http.HttpHeaders;
import com.netflix.client.http.HttpRequest;
import com.netflix.client.http.HttpResponse;
import com.netflix.serialization.Deserializer;
import com.netflix.serialization.Serializer;
import com.netflix.serialization.TypeDef;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.UniformInterfaceException;
/**
......@@ -45,15 +51,17 @@ import com.sun.jersey.api.client.UniformInterfaceException;
*/
class HttpClientResponse implements HttpResponse {
private ClientResponse bcr = null;
private URI requestedURI; // the request url that got this response
private Multimap<String, String> headers = ArrayListMultimap.<String, String>create();
private HttpHeaders httpHeaders;
public HttpClientResponse(ClientResponse cr){
private final ClientResponse bcr;
private final Multimap<String, String> headers = ArrayListMultimap.<String, String>create();
private final HttpHeaders httpHeaders;
private final URI requestedURI;
private final IClientConfig overrideConfig;
public HttpClientResponse(ClientResponse cr, URI requestedURI, IClientConfig config){
bcr = cr;
this.requestedURI = requestedURI;
this.overrideConfig = config;
for (Map.Entry<String, List<String>> entry: bcr.getHeaders().entrySet()) {
if (entry.getKey() != null && entry.getValue() != null) {
headers.putAll(entry.getKey(), entry.getValue());
......@@ -101,13 +109,7 @@ class HttpClientResponse implements HttpResponse {
public <T> T getEntity(Class<T> c) throws Exception {
T t = null;
try {
t = this.bcr.getEntity(c);
} catch (UniformInterfaceException e) {
throw new ClientException(ClientException.ErrorType.GENERAL, e.getMessage(), e.getCause());
}
return t;
return getEntity(TypeDef.fromClass(c));
}
@Override
......@@ -133,13 +135,9 @@ class HttpClientResponse implements HttpResponse {
@Override
public URI getRequestedURI() {
return this.requestedURI;
return requestedURI;
}
public void setRequestedURI(URI requestedURI) {
this.requestedURI = requestedURI;
}
@Override
public Object getPayload() throws ClientException {
if (hasEntity()) {
......@@ -166,7 +164,19 @@ class HttpClientResponse implements HttpResponse {
@SuppressWarnings("unchecked")
@Override
public <T> T getEntity(TypeDef<T> type) throws Exception {
return (T) getEntity(type.getRawType());
if (overrideConfig != null) {
Deserializer deserializer = overrideConfig.getPropertyWithType(CommonClientConfigKey.Deserializer, null);
if (deserializer != null) {
return (T) deserializer.deserialize(getInputStream(), type);
}
}
T t = null;
try {
t = this.bcr.getEntity(new GenericType<T>(type.getType()){});
} catch (UniformInterfaceException e) {
throw new ClientException(ClientException.ErrorType.GENERAL, e.getMessage(), e.getCause());
}
return t;
}
@Override
......
......@@ -72,6 +72,9 @@ import com.netflix.niws.cert.AbstractSslContextFactory;
import com.netflix.niws.client.ClientSslSocketFactoryException;
import com.netflix.niws.client.URLSslContextFactory;
import com.netflix.niws.client.http.HttpClientRequest.Verb;
import com.netflix.serialization.SerializationUtils;
import com.netflix.serialization.Serializer;
import com.netflix.serialization.TypeDef;
import com.netflix.util.Pair;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
......@@ -507,7 +510,7 @@ public class RestClient extends AbstractLoadBalancerAwareClient<HttpRequest, Htt
@Override
public HttpResponse execute(HttpRequest task) throws Exception {
return execute(task.getVerb(), task.getUri(),
task.getHeaders(), task.getQueryParams(), task.getOverrideConfig(), task.getEntity());
task.getHeaders(), task.getQueryParams(), task.getOverrideConfig(), task.getEntity(), task.getEntityType());
}
......@@ -557,7 +560,7 @@ public class RestClient extends AbstractLoadBalancerAwareClient<HttpRequest, Htt
private HttpResponse execute(HttpRequest.Verb verb, URI uri,
Map<String, Collection<String>> headers, Map<String, Collection<String>> params,
IClientConfig overriddenClientConfig, Object requestEntity) throws Exception {
IClientConfig overriddenClientConfig, Object requestEntity, TypeDef<?> entityType) throws Exception {
HttpClientResponse thisResponse = null;
boolean bbFollowRedirects = bFollowRedirects;
// read overriden props
......@@ -597,18 +600,28 @@ public class RestClient extends AbstractLoadBalancerAwareClient<HttpRequest, Htt
}
}
}
Object entity = requestEntity;
if (overriddenClientConfig != null) {
Serializer serializer = overriddenClientConfig.getPropertyWithType(CommonClientConfigKey.Serializer, null);
if (serializer != null) {
String content = SerializationUtils.serializeToString(serializer, entity,
entityType == null ? TypeDef.fromClass(entity.getClass()) : entityType);
entity = content;
}
}
switch (verb) {
case GET:
jerseyResponse = b.get(ClientResponse.class);
break;
case POST:
jerseyResponse = b.post(ClientResponse.class, requestEntity);
jerseyResponse = b.post(ClientResponse.class, entity);
break;
case PUT:
jerseyResponse = b.put(ClientResponse.class, requestEntity);
jerseyResponse = b.put(ClientResponse.class, entity);
break;
case DELETE:
jerseyResponse = b.delete(ClientResponse.class, requestEntity);
jerseyResponse = b.delete(ClientResponse.class, entity);
break;
case HEAD:
jerseyResponse = b.head();
......@@ -622,8 +635,7 @@ public class RestClient extends AbstractLoadBalancerAwareClient<HttpRequest, Htt
"You have to one of the REST verbs such as GET, POST etc.");
}
thisResponse = new HttpClientResponse(jerseyResponse);
thisResponse.setRequestedURI(uri);
thisResponse = new HttpClientResponse(jerseyResponse, uri, overriddenClientConfig);
if (thisResponse.getStatus() == 503){
thisResponse.close();
throw new ClientException(ClientException.ErrorType.SERVER_THROTTLED);
......
......@@ -31,9 +31,14 @@ import org.junit.BeforeClass;
import org.junit.Test;
import com.netflix.client.ClientFactory;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.http.HttpRequest;
import com.netflix.client.http.HttpResponse;
import com.netflix.client.http.HttpRequest.Verb;
import com.netflix.serialization.JacksonCodec;
import com.netflix.serialization.Serializer;
import com.sun.jersey.api.container.httpserver.HttpServerFactory;
import com.sun.jersey.api.core.PackagesResourceConfig;
import com.sun.jersey.core.util.MultivaluedMapImpl;
......@@ -75,6 +80,18 @@ public class GetPostTest {
assertEquals(200, response.getStatus());
assertTrue(response.getEntity(TestObject.class).name.equals("test"));
}
@Test
public void testGetJson() throws Exception {
URI getUri = new URI(SERVICE_URI + "test/getJsonObject");
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("name", "test");
IClientConfig overrideConfig = new DefaultClientConfigImpl().setPropertyWithType(CommonClientConfigKey.Deserializer, JacksonCodec.getInstance());
HttpRequest request = HttpRequest.newBuilder().uri(getUri).queryParam("name", "test").overrideConfig(overrideConfig).build();
HttpResponse response = client.execute(request);
assertEquals(200, response.getStatus());
assertTrue(response.getEntity(TestObject.class).name.equals("test"));
}
@Test
public void testPost() throws Exception {
......@@ -87,6 +104,25 @@ public class GetPostTest {
assertTrue(response.getEntity(TestObject.class).name.equals("fromClient"));
}
@Test
public void testPostWithJson() throws Exception {
URI getUri = new URI(SERVICE_URI + "test/setJsonObject");
TestObject obj = new TestObject();
obj.name = "fromClient";
IClientConfig overrideConfig = new DefaultClientConfigImpl().setPropertyWithType(CommonClientConfigKey.Serializer, JacksonCodec.getInstance())
.setPropertyWithType(CommonClientConfigKey.Deserializer, JacksonCodec.getInstance());
HttpRequest request = HttpRequest.newBuilder().verb(Verb.POST)
.uri(getUri)
.entity(obj)
.overrideConfig(overrideConfig)
.header("Content-Type", "application/json")
.build();
HttpResponse response = client.execute(request);
assertEquals(200, response.getStatus());
assertTrue(response.getEntity(TestObject.class).name.equals("fromClient"));
}
@Test
public void testChunkedEncoding() throws Exception {
String obj = "chunked encoded content";
......
......@@ -30,6 +30,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.io.IOUtils;
import org.codehaus.jackson.map.ObjectMapper;
@Produces({"application/xml"})
@Path("/test")
......@@ -42,7 +43,19 @@ public class TestResource {
obj.name = name;
return Response.ok(obj).build();
}
@Path("/getJsonObject")
@Produces("application/json")
@GET
public Response getJsonObject(@QueryParam ("name") String name) throws Exception {
TestObject obj = new TestObject();
obj.name = name;
ObjectMapper mapper = new ObjectMapper();
String value = mapper.writeValueAsString(obj);
return Response.ok(value).build();
}
@Path("/setObject")
@POST
@Consumes(MediaType.APPLICATION_XML)
......@@ -50,6 +63,15 @@ public class TestResource {
return Response.ok(obj).build();
}
@Path("/setJsonObject")
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response setJsonObject(String obj) throws Exception {
System.out.println("Get json string " + obj);
return Response.ok(obj).build();
}
@POST
@Path("/postStream")
@Consumes( { MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_XML})
......
......@@ -35,7 +35,6 @@ public class HttpEntityDecoder<T> extends MessageToMessageDecoder<FullHttpRespon
List<Object> out) throws Exception {
int statusCode = msg.getStatus().code();
if (type.getRawType().isAssignableFrom(HttpResponse.class)) {
msg.content().retain();
out.add(new NettyHttpResponse(msg, msg.content(), serializationFactory, request));
} else if (statusCode >= 200 && statusCode < 300) {
if (msg.content().isReadable()) {
......
......@@ -8,9 +8,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.InputStream;
import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
......@@ -33,6 +31,9 @@ class NettyHttpResponse implements ResponseWithTypedEntity, com.netflix.client.h
public NettyHttpResponse(HttpResponse response, ByteBuf content, SerializationFactory<HttpSerializationContext> serializationFactory, HttpRequest request) {
this.response = response;
this.content = content;
if (content != null) {
content.retain();
}
this.serializationFactory = serializationFactory;
this.request = request;
}
......
package com.netflix.client.netty.http;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelOption;
......@@ -32,9 +33,12 @@ import com.netflix.client.ClientException;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.config.IClientConfigKey;
import com.netflix.client.http.HttpRequest;
import com.netflix.client.http.HttpResponse;
import com.netflix.client.http.UnexpectedHttpResponseException;
import com.netflix.client.http.HttpRequest.Builder;
import com.netflix.client.http.HttpRequest.Verb;
import com.netflix.serialization.Deserializer;
import com.netflix.serialization.HttpSerializationContext;
import com.netflix.serialization.JacksonSerializationFactory;
......@@ -52,45 +56,47 @@ public class RxNettyHttpClient {
private IClientConfig config;
public RxNettyHttpClient() {
this(DefaultClientConfigImpl.getClientConfigWithDefaultValues(), new JacksonSerializationFactory());
this(DefaultClientConfigImpl.getClientConfigWithDefaultValues(), new JacksonSerializationFactory(),
new Bootstrap().group(new NioEventLoopGroup()));
}
public RxNettyHttpClient(IClientConfig config) {
this(config, new JacksonSerializationFactory());
this(config, new JacksonSerializationFactory(), new Bootstrap().group(new NioEventLoopGroup()));
}
public RxNettyHttpClient(IClientConfig config, SerializationFactory<HttpSerializationContext> serializationFactory) {
public RxNettyHttpClient(IClientConfig config, SerializationFactory<HttpSerializationContext> serializationFactory,
Bootstrap bootStrap) {
this.config = config;
this.connectTimeout = config.getPropertyAsInteger(CommonClientConfigKey.ConnectTimeout, DefaultClientConfigImpl.DEFAULT_CONNECT_TIMEOUT);
this.readTimeout = config.getPropertyAsInteger(CommonClientConfigKey.ReadTimeout, DefaultClientConfigImpl.DEFAULT_READ_TIMEOUT);
this.serializationFactory = serializationFactory;
this.observableClient = ObservableHttpClient.newBuilder()
.withChannelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.build(new NioEventLoopGroup());
.build(bootStrap.group());
}
private class SingleEntityHandler<T> extends HttpProtocolHandlerAdapter<T> {
private HttpEntityDecoder<T> decoder;
private HttpRequest request;
private TypeDef<T> typeDef;
private SingleEntityHandler(HttpRequest request, SerializationFactory<HttpSerializationContext> serializationFactory, TypeDef<T> typeDef) {
decoder = new HttpEntityDecoder<T>(serializationFactory, request, typeDef);
this.request = request;
this.typeDef = typeDef;
}
@Override
public void configure(ChannelPipeline pipeline) {
int timeout = readTimeout;
if (request.getOverrideConfig() != null) {
Integer overrideTimeout = request.getOverrideConfig().getTypedProperty(CommonClientConfigKey.ReadTimeout);
Integer overrideTimeout = request.getOverrideConfig().getPropertyWithType(CommonClientConfigKey.ReadTimeout);
if (overrideTimeout != null) {
timeout = overrideTimeout.intValue();
}
}
pipeline.addAfter("http-response-decoder", "http-aggregator", new HttpObjectAggregator(Integer.MAX_VALUE));
pipeline.addAfter("http-aggregator", SelfRemovingResponseTimeoutHandler.NAME, new SelfRemovingResponseTimeoutHandler(timeout, TimeUnit.MILLISECONDS));
pipeline.addAfter(SelfRemovingResponseTimeoutHandler.NAME, "entity-decoder", decoder);
pipeline.addAfter(SelfRemovingResponseTimeoutHandler.NAME, "entity-decoder", new HttpEntityDecoder<T>(serializationFactory, request, typeDef));
}
}
......@@ -110,27 +116,36 @@ public class RxNettyHttpClient {
uri = encoder.toString();
}
if (entity != null) {
Serializer serializer = null;
if (request.getOverrideConfig() != null) {
serializer = request.getOverrideConfig().getTypedProperty(CommonClientConfigKey.Serializer);
}
if (serializer == null) {
HttpSerializationContext key = new HttpSerializationContext(request.getHttpHeaders(), request.getUri());
serializer = serializationFactory.getSerializer(key, request.getEntityType());
}
if (serializer == null) {
throw new ClientException("Unable to find serializer");
}
ByteArrayOutputStream bout = new ByteArrayOutputStream();
try {
serializer.serialize(bout, entity, request.getEntityType());
} catch (IOException e) {
throw new ClientException("Error serializing entity in request", e);
ByteBuf buf = null;
int contentLength = -1;
if (entity instanceof ByteBuf) {
buf = (ByteBuf) entity;
} else {
Serializer serializer = null;
if (request.getOverrideConfig() != null) {
serializer = request.getOverrideConfig().getPropertyWithType(CommonClientConfigKey.Serializer);
}
if (serializer == null) {
HttpSerializationContext key = new HttpSerializationContext(request.getHttpHeaders(), request.getUri());
serializer = serializationFactory.getSerializer(key, request.getEntityType());
}
if (serializer == null) {
throw new ClientException("Unable to find serializer");
}
ByteArrayOutputStream bout = new ByteArrayOutputStream();
try {
serializer.serialize(bout, entity, request.getEntityType());
} catch (IOException e) {
throw new ClientException("Error serializing entity in request", e);
}
byte[] content = bout.toByteArray();
buf = Unpooled.wrappedBuffer(content);
contentLength = content.length;
}
byte[] content = bout.toByteArray();
ByteBuf buf = Unpooled.wrappedBuffer(content);
r = new ValidatedFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(request.getVerb().name()), uri, buf);
r.headers().set(HttpHeaders.Names.CONTENT_LENGTH, content.length);
if (contentLength >= 0) {
r.headers().set(HttpHeaders.Names.CONTENT_LENGTH, contentLength);
}
} else {
r = new ValidatedFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(request.getVerb().name()), uri);
}
......@@ -149,6 +164,10 @@ public class RxNettyHttpClient {
return config;
}
public SerializationFactory<HttpSerializationContext> getSerializationFactory() {
return serializationFactory;
}
public <T> Observable<ServerSentEvent<T>> createServerSentEventEntityObservable(final HttpRequest request, final TypeDef<T> typeDef) {
return createServerSentEventObservable(request)
.flatMap(new Func1<ObservableHttpResponse<Message>, Observable<ServerSentEvent<T>>>() {
......@@ -195,15 +214,63 @@ public class RxNettyHttpClient {
});
}
private <T> T getProperty(IClientConfigKey<T> key, IClientConfig overrideConfig) {
T value = null;
if (overrideConfig != null) {
value = overrideConfig.getPropertyWithType(key);
}
if (value == null) {
value = getConfig().getPropertyWithType(key);
}
return value;
}
private static class RedirectException extends RuntimeException {
public RedirectException(String message) {
super(message);
}
}
public <T> Observable<ObservableHttpResponse<T>> createObservableHttpResponse(final HttpRequest request, HttpProtocolHandler<T> protocolHandler) {
public <T> Observable<ObservableHttpResponse<T>> createObservableHttpResponse(final HttpRequest request, final HttpProtocolHandler<T> protocolHandler) {
ValidatedFullHttpRequest r = null;
try {
r = getHttpRequest(request);
} catch (final Exception e) {
return Observable.error(e);
}
return observableClient.execute(r, protocolHandler);
final Observable<ObservableHttpResponse<T>> observable = observableClient.execute(r, protocolHandler);
Boolean followRedirect = getProperty(CommonClientConfigKey.FollowRedirects, request.getOverrideConfig());
if (followRedirect != null && followRedirect.booleanValue()) {
return observable.flatMap(new Func1<ObservableHttpResponse<T>, Observable<ObservableHttpResponse<T>>>() {
@Override
public Observable<ObservableHttpResponse<T>> call(
ObservableHttpResponse<T> t1) {
int statusCode = t1.response().getStatus().code();
switch (statusCode) {
case 301:
case 302:
case 303:
case 307:
case 308:
String location = t1.response().headers().get("Location");
if (location == null) {
return Observable.error(new Exception("Location header is not set in the redirect response"));
}
Builder builder = HttpRequest.newBuilder(request).uri(location);
if (statusCode == 303) {
// according to the spec, this must be done with GET
builder.verb(Verb.GET);
}
Observable<ObservableHttpResponse<T>> newObservable = createObservableHttpResponse(builder.build(), protocolHandler);
return newObservable;
default: break;
}
return Observable.from(t1);
}
});
} else {
return observable;
}
}
}
package com.netflix.client.netty.http;
import io.netty.bootstrap.Bootstrap;
import rx.Observable;
import rx.netty.protocol.http.HttpProtocolHandler;
import rx.netty.protocol.http.ObservableHttpResponse;
......@@ -7,28 +8,38 @@ import rx.netty.protocol.http.ObservableHttpResponse;
import com.netflix.client.LoadBalancerErrorHandler;
import com.netflix.client.LoadBalancerObservableRequest;
import com.netflix.client.LoadBalancerObservables;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.http.HttpRequest;
import com.netflix.client.http.HttpResponse;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.Server;
import com.netflix.loadbalancer.ServerStats;
import com.netflix.serialization.HttpSerializationContext;
import com.netflix.serialization.JacksonSerializationFactory;
import com.netflix.serialization.SerializationFactory;
public class RxNettyHttpLoadBalancingClient extends RxNettyHttpClient {
private LoadBalancerObservables<HttpRequest, HttpResponse> lbObservables;
public RxNettyHttpLoadBalancingClient(IClientConfig config, LoadBalancerErrorHandler<HttpRequest, HttpResponse> errorHandler) {
public RxNettyHttpLoadBalancingClient() {
this(DefaultClientConfigImpl.getClientConfigWithDefaultValues());
}
public RxNettyHttpLoadBalancingClient(IClientConfig config) {
super(config);
lbObservables = new LoadBalancerObservables<HttpRequest, HttpResponse>(config);
lbObservables.setErrorHandler(new NettyHttpLoadBalancerErrorHandler());
}
public RxNettyHttpLoadBalancingClient(IClientConfig config, LoadBalancerErrorHandler<HttpRequest, HttpResponse> errorHandler,
SerializationFactory<HttpSerializationContext> serializationFactory, Bootstrap bootStrap) {
super(config, serializationFactory, bootStrap);
this.lbObservables = new LoadBalancerObservables<HttpRequest, HttpResponse>(config);
lbObservables.setErrorHandler(errorHandler);
}
public RxNettyHttpLoadBalancingClient(RxNettyHttpClient client) {
this.lbObservables = new LoadBalancerObservables<HttpRequest, HttpResponse>(client.getConfig());
lbObservables.setErrorHandler(new NettyHttpLoadBalancerErrorHandler());
}
private <T> Observable<ObservableHttpResponse<T>> executeSuper(final HttpRequest request, HttpProtocolHandler<T> protocolHandler) {
return super.createObservableHttpResponse(request, protocolHandler);
}
......
......@@ -17,6 +17,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.junit.BeforeClass;
import org.junit.Test;
......@@ -25,6 +27,8 @@ import rx.util.functions.Action1;
import rx.util.functions.Func1;
import com.google.common.collect.Lists;
import com.google.mockwebserver.MockResponse;
import com.google.mockwebserver.MockWebServer;
import com.netflix.client.ClientException;
import com.netflix.client.LoadBalancerObservables;
import com.netflix.client.config.CommonClientConfigKey;
......@@ -64,7 +68,7 @@ public class NettyClientTest {
PackagesResourceConfig resourceConfig = new PackagesResourceConfig("com.netflix.ribbon.test.resources");
port = (new Random()).nextInt(1000) + 4000;
SERVICE_URI = "http://localhost:" + port + "/";
ExecutorService service = Executors.newFixedThreadPool(200);
ExecutorService service = Executors.newFixedThreadPool(20);
try{
server = HttpServerFactory.create(SERVICE_URI, resourceConfig);
server.setExecutor(service);
......@@ -73,6 +77,7 @@ public class NettyClientTest {
e.printStackTrace();
fail("Unable to start server");
}
// LogManager.getRootLogger().setLevel((Level)Level.DEBUG);
}
@Test
......@@ -93,12 +98,40 @@ public class NettyClientTest {
});
assertEquals(Lists.newArrayList(EmbeddedResources.defaultPerson), result);
}
@Test
public void testRedirect() throws Exception {
URI uri = new URI(SERVICE_URI + "testAsync/redirect");
HttpRequest request = HttpRequest.newBuilder().uri(uri).queryParam("port", String.valueOf(port)).build();
RxNettyHttpClient observableClient = new RxNettyHttpClient(DefaultClientConfigImpl.getClientConfigWithDefaultValues().setPropertyWithType(CommonClientConfigKey.ReadTimeout, 1000000));
final List<Person> result = Lists.newArrayList();
observableClient.createEntityObservable(request, TypeDef.fromClass(Person.class)).subscribe(new Action1<Person>() {
@Override
public void call(Person t1) {
try {
result.add(t1);
} catch (Exception e) {
e.printStackTrace();
}
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable t1) {
t1.printStackTrace();
}
}
);
Thread.sleep(2000);
assertEquals(Lists.newArrayList(EmbeddedResources.defaultPerson), result);
}
@Test
public void testWithOverrideDeserializer() throws Exception {
URI uri = new URI(SERVICE_URI + "testAsync/person");
DefaultClientConfigImpl overrideConfig = new DefaultClientConfigImpl();
overrideConfig.setTypedProperty(CommonClientConfigKey.Deserializer, new StringDeserializer());
overrideConfig.setPropertyWithType(CommonClientConfigKey.Deserializer, StringDeserializer.getInstance());
HttpRequest request = HttpRequest.newBuilder().uri(uri).overrideConfig(overrideConfig).build();
RxNettyHttpClient observableClient = new RxNettyHttpClient();
final List<String> result = Lists.newArrayList();
......@@ -261,11 +294,10 @@ public class NettyClientTest {
@Test
public void testObservableWithMultipleServers() throws Exception {
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ConnectTimeout, "1000");
RxNettyHttpClient observableClient = new RxNettyHttpClient(config);
URI uri = new URI("/testAsync/person");
HttpRequest request = HttpRequest.newBuilder().uri(uri).build();
RxNettyHttpLoadBalancingClient lbObservables = new RxNettyHttpLoadBalancingClient(observableClient);
RxNettyHttpLoadBalancingClient lbObservables = new RxNettyHttpLoadBalancingClient(config);
BaseLoadBalancer lb = new BaseLoadBalancer(new DummyPing(), new AvailabilityFilteringRule());
Server badServer = new Server("localhost:12345");
Server goodServer = new Server("localhost:" + port);
......@@ -292,14 +324,79 @@ public class NettyClientTest {
assertEquals(0, stats.getSuccessiveConnectionFailureCount());
}
@Test
public void testHttpResponseObservableWithMultipleServers() throws Exception {
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ConnectTimeout, "1000");
URI uri = new URI("/testAsync/person");
HttpRequest request = HttpRequest.newBuilder().uri(uri).build();
RxNettyHttpLoadBalancingClient lbObservables = new RxNettyHttpLoadBalancingClient(config);
BaseLoadBalancer lb = new BaseLoadBalancer(new DummyPing(), new AvailabilityFilteringRule());
Server badServer = new Server("localhost:12345");
Server goodServer = new Server("localhost:" + port);
List<Server> servers = Lists.newArrayList(badServer, badServer, badServer, goodServer);
lb.setServersList(servers);
lbObservables.setLoadBalancer(lb);
lbObservables.setMaxAutoRetries(1);
lbObservables.setMaxAutoRetriesNextServer(3);
Observable<HttpResponse> observableWithRetries = lbObservables.createFullHttpResponseObservable(request);
ObserverWithLatch<HttpResponse> observer = new ObserverWithLatch<HttpResponse>();
observableWithRetries.subscribe(observer);
observer.await();
assertEquals(200, observer.obj.getStatus());
}
@Test
public void testLoadBalancingObservablesWithReadTimeout() throws Exception {
MockWebServer server = new MockWebServer();
String content = "{\"name\": \"ribbon\", \"age\": 2}";
server.enqueue(new MockResponse().setResponseCode(200).setHeader("Content-type", "application/json")
.setBody(content));
server.play();
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues()
.setPropertyWithType(CommonClientConfigKey.ReadTimeout, 100);
URI uri = new URI("/testAsync/readTimeout");
HttpRequest request = HttpRequest.newBuilder().uri(uri).build();
RxNettyHttpLoadBalancingClient lbObservables = new RxNettyHttpLoadBalancingClient(config);
BaseLoadBalancer lb = new BaseLoadBalancer(new DummyPing(), new AvailabilityFilteringRule());
Server goodServer = new Server("localhost:" + server.getPort());
Server badServer = new Server("localhost:" + port);
List<Server> servers = Lists.newArrayList(goodServer, badServer, badServer, goodServer);
lb.setServersList(servers);
lbObservables.setLoadBalancer(lb);
lbObservables.setMaxAutoRetries(1);
lbObservables.setMaxAutoRetriesNextServer(3);
Observable<Person> observableWithRetries = lbObservables.createEntityObservable(request, TypeDef.fromClass(Person.class));
ObserverWithLatch<Person> observer = new ObserverWithLatch<Person>();
observableWithRetries.subscribe(observer);
observer.await();
assertEquals("ribbon", observer.obj.name);
assertEquals(2, observer.obj.age);
ServerStats stats = lbObservables.getServerStats(badServer);
server.shutdown();
// two requests to bad server because retry same server is set to 1
assertEquals(4, stats.getTotalRequestsCount());
assertEquals(0, stats.getActiveRequestsCount());
assertEquals(4, stats.getSuccessiveConnectionFailureCount());
stats = lbObservables.getServerStats(goodServer);
// two requests to bad server because retry same server is set to 1
assertEquals(1, stats.getTotalRequestsCount());
assertEquals(0, stats.getActiveRequestsCount());
assertEquals(0, stats.getSuccessiveConnectionFailureCount());
}
@Test
public void testObservableWithMultipleServersFailed() throws Exception {
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ConnectTimeout, "1000");
RxNettyHttpClient observableClient = new RxNettyHttpClient(config);
URI uri = new URI("/testAsync/person");
HttpRequest request = HttpRequest.newBuilder().uri(uri).build();
RxNettyHttpLoadBalancingClient lbObservables = new RxNettyHttpLoadBalancingClient(observableClient);
RxNettyHttpLoadBalancingClient lbObservables = new RxNettyHttpLoadBalancingClient(config);
BaseLoadBalancer lb = new BaseLoadBalancer(new DummyPing(), new AvailabilityFilteringRule());
Server badServer = new Server("localhost:12345");
Server badServer1 = new Server("localhost:12346");
......@@ -326,7 +423,7 @@ public class NettyClientTest {
@Test
public void testStream() throws Exception {
HttpRequest request = HttpRequest.newBuilder().uri(SERVICE_URI + "testAsync/personStream")
.overrideConfig(new DefaultClientConfigImpl().setTypedProperty(CommonClientConfigKey.Deserializer, JacksonCodec.getInstance()))
.overrideConfig(new DefaultClientConfigImpl().setPropertyWithType(CommonClientConfigKey.Deserializer, JacksonCodec.getInstance()))
.build();
RxNettyHttpClient observableClient = new RxNettyHttpClient();
final List<Person> result = Lists.newArrayList();
......@@ -339,6 +436,32 @@ public class NettyClientTest {
assertEquals(EmbeddedResources.entityStream, result);
}
@Test
public void testStreamWithLoadBalancer() throws Exception {
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ConnectTimeout, "1000");
RxNettyHttpLoadBalancingClient lbObservables = new RxNettyHttpLoadBalancingClient(config);
HttpRequest request = HttpRequest.newBuilder().uri("/testAsync/personStream")
.overrideConfig(new DefaultClientConfigImpl().setPropertyWithType(CommonClientConfigKey.Deserializer, JacksonCodec.getInstance()))
.build();
final List<Person> result = Lists.newArrayList();
BaseLoadBalancer lb = new BaseLoadBalancer(new DummyPing(), new AvailabilityFilteringRule());
Server goodServer = new Server("localhost:" + port);
Server badServer = new Server("localhost:12245");
List<Server> servers = Lists.newArrayList(badServer, badServer, badServer, goodServer);
lb.setServersList(servers);
lbObservables.setLoadBalancer(lb);
lbObservables.setMaxAutoRetries(1);
lbObservables.setMaxAutoRetriesNextServer(3);
lbObservables.createServerSentEventEntityObservable(request, TypeDef.fromClass(Person.class)).toBlockingObservable().forEach(new Action1<ServerSentEvent<Person>>() {
@Override
public void call(ServerSentEvent<Person> t1) {
result.add(t1.getEntity());
}
});
assertEquals(EmbeddedResources.entityStream, result);
}
@Test
public void testNoEntity() throws Exception {
......@@ -403,6 +526,29 @@ public class NettyClientTest {
assertNull(p);
assertTrue(throwable.get() instanceof UnexpectedHttpResponseException);
assertEquals("Service Unavailable", throwable.get().getMessage());
assertEquals(503, ((UnexpectedHttpResponseException) throwable.get()).getStatusCode());
UnexpectedHttpResponseException ex = (UnexpectedHttpResponseException) throwable.get();
assertEquals(503, ex.getStatusCode());
String body = ex.getResponse().getEntity(String.class);
assertEquals("Rate exceeds limit", body);
}
@Test
public void testUnexpectedResponse() throws Exception {
URI uri = new URI(SERVICE_URI + "testAsync/throttle");
HttpRequest request = HttpRequest.newBuilder().uri(uri).build();
RxNettyHttpClient client = new RxNettyHttpClient();
Observable<HttpResponse> responseObservable = client.createFullHttpResponseObservable(request);
final AtomicReference<HttpResponse> response = new AtomicReference<HttpResponse>();
responseObservable.toBlockingObservable().forEach(new Action1<HttpResponse>() {
@Override
public void call(HttpResponse t1) {
response.set(t1);
}
});
assertEquals(503, response.get().getStatus());
String body = response.get().getEntity(String.class);
assertEquals("Rate exceeds limit", body);
}
}
......@@ -140,7 +140,7 @@ public class EmbeddedResources {
@GET
@Path("/throttle")
public Response throttle() {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("Rate exceeds limit").build();
}
@GET
......@@ -160,6 +160,12 @@ public class EmbeddedResources {
};
}
@GET
@Path("/redirect")
public Response redirect(@QueryParam("port") int port) {
return Response.status(301).header("Location", "http://localhost:" + port + "/testAsync/person").build();
}
@GET
@Path("/personStream")
@Produces("text/event-stream")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册