diff --git a/build.gradle b/build.gradle index 29ac47ebdae238d5ea469be0854789adecd93ed5..7487fd576e49e3459b7341b7531d81e39c16cdc4 100644 --- a/build.gradle +++ b/build.gradle @@ -41,7 +41,7 @@ subprojects { project(':ribbon-core') { dependencies { compile 'com.netflix.netflix-commons:netflix-statistics:0.1.1' - compile 'com.netflix.rxjava:rxjava-core:0.14.2' + compile 'com.netflix.rxjava:rxjava-core:0.16.1' } } @@ -65,6 +65,7 @@ project(':ribbon-rxnetty') { dependencies { compile project(':ribbon-core') compile 'com.netflix.rxnetty:rx-netty:0.2.1' + testCompile 'com.google.mockwebserver:mockwebserver:20130706' testCompile project(':ribbon-test') } } diff --git a/ribbon-core/src/main/java/com/netflix/client/LoadBalancerContext.java b/ribbon-core/src/main/java/com/netflix/client/LoadBalancerContext.java index 65d5d197d2e1785d37e467321859e48bc7132bee..4d8d9c74f6bad4234d71550aa5d5a50f5d2f1273 100644 --- a/ribbon-core/src/main/java/com/netflix/client/LoadBalancerContext.java +++ b/ribbon-core/src/main/java/com/netflix/client/LoadBalancerContext.java @@ -275,7 +275,49 @@ public abstract class LoadBalancerContext errorHandler = getErrorHandler(); + if (errorHandler != null && e != null) { + if (errorHandler.isCircuitTrippingException(e)) { + stats.incrementSuccessiveConnectionFailureCount(); + stats.addToFailureCount(); + } else { + stats.clearSuccessiveConnectionFailureCount(); + } + } + } catch (Throwable ex) { + logger.error("Unexpected exception", ex); + } + } + + /** + * This is called after a response is received from the client + * to update related stats. + */ + protected void noteResponse(ServerStats stats, ClientRequest request, Object response, long responseTime) { + try { + recordStats(stats, responseTime); + LoadBalancerErrorHandler errorHandler = getErrorHandler(); + if (errorHandler != null && response != null) { + if (errorHandler.isCircuitTrippinResponse(response)) { + stats.incrementSuccessiveConnectionFailureCount(); + stats.addToFailureCount(); + } else { + stats.clearSuccessiveConnectionFailureCount(); + } + } + } catch (Throwable ex) { + logger.error("Unexpected exception", ex); + } + } + /** * This is usually called just before client execute a request. */ diff --git a/ribbon-core/src/main/java/com/netflix/client/config/DefaultClientConfigImpl.java b/ribbon-core/src/main/java/com/netflix/client/config/DefaultClientConfigImpl.java index 23f1bf96cadbe8af46e0a6621710ca24919d66ec..03e52092257157451ac0a0bfb1e8a46eb7f9aa4e 100644 --- a/ribbon-core/src/main/java/com/netflix/client/config/DefaultClientConfigImpl.java +++ b/ribbon-core/src/main/java/com/netflix/client/config/DefaultClientConfigImpl.java @@ -792,7 +792,7 @@ public class DefaultClientConfigImpl implements IClientConfig { @SuppressWarnings("unchecked") @Override - public T getTypedProperty(IClientConfigKey key) { + public T getPropertyWithType(IClientConfigKey key) { Object obj = properties.get(key.key()); Class type = key.type(); try { @@ -800,14 +800,17 @@ public class DefaultClientConfigImpl implements IClientConfig { } catch (ClassCastException e) { if (obj instanceof String) { String stringValue = (String) obj; - if (Integer.class.isAssignableFrom(type)) { + if (Integer.class.equals(type)) { return (T) Integer.valueOf(stringValue); - } else if (Boolean.class.isAssignableFrom(type)) { + } else if (Boolean.class.equals(type)) { return (T) Boolean.valueOf(stringValue); - } else if (Float.class.isAssignableFrom(type)) { + } else if (Float.class.equals(type)) { return (T) Float.valueOf(stringValue); - } else if (Long.class.isAssignableFrom(type)) { + } else if (Long.class.equals(type)) { return (T) Long.valueOf(stringValue); + } else if (Double.class.equals(type)) { + return (T) Double.valueOf(stringValue); + } throw new IllegalArgumentException("Unable to convert string value to desired type " + type); } else { @@ -817,8 +820,17 @@ public class DefaultClientConfigImpl implements IClientConfig { } @Override - public IClientConfig setTypedProperty(IClientConfigKey key, T value) { + public IClientConfig setPropertyWithType(IClientConfigKey key, T value) { properties.put(key.key(), value); return this; } + + @Override + public T getPropertyWithType(IClientConfigKey key, T defaultValue) { + T value = getPropertyWithType(key); + if (value == null) { + value = defaultValue; + } + return value; + } } diff --git a/ribbon-core/src/main/java/com/netflix/client/config/IClientConfig.java b/ribbon-core/src/main/java/com/netflix/client/config/IClientConfig.java index 69c587448ee9b04745f0479097794138b391e4e1..9c22d1f123bcc209bcc66200a437be1cfbcd9d66 100644 --- a/ribbon-core/src/main/java/com/netflix/client/config/IClientConfig.java +++ b/ribbon-core/src/main/java/com/netflix/client/config/IClientConfig.java @@ -40,8 +40,16 @@ public interface IClientConfig { public Map getProperties(); + /** + * @deprecated use {@link #setPropertyWithType(IClientConfigKey, Object)} + */ + @Deprecated public void setProperty(IClientConfigKey key, Object value); + /** + * @deprecated use {@link #getPropertyWithType(IClientConfigKey)} + */ + @Deprecated public Object getProperty(IClientConfigKey key); public Object getProperty(IClientConfigKey key, Object defaultVal); @@ -60,11 +68,17 @@ public interface IClientConfig { public boolean getPropertyAsBoolean(IClientConfigKey key, boolean defaultValue); /** - * Returns a typed property. This property must be set by {@link #setTypedProperty(IClientConfigKey, Object)}. - * If the property of IClientConfigKey is not set, or is set by {@link #loadProperties(String)} or {@link #setProperty(IClientConfigKey, Object)}, - * this will return null. + * Returns a typed property. This property must be set by {@link #setPropertyWithType(IClientConfigKey, Object)}. + * If the property of IClientConfigKey is not set, it returns null. */ - public T getTypedProperty(IClientConfigKey key); + public T getPropertyWithType(IClientConfigKey key); + + /** + * Returns a typed property. This property must be set by {@link #setPropertyWithType(IClientConfigKey, Object)}. + * If the property of IClientConfigKey is not set, it returns the default value passed in as the parameter. + */ + public T getPropertyWithType(IClientConfigKey key, T defaultValue); + - public IClientConfig setTypedProperty(IClientConfigKey key, T value); + public IClientConfig setPropertyWithType(IClientConfigKey key, T value); } diff --git a/ribbon-core/src/main/java/com/netflix/client/http/HttpRequest.java b/ribbon-core/src/main/java/com/netflix/client/http/HttpRequest.java index abb6c74f8fb5bea7ca0c551a9aa422fe890052e7..967368223aa1278ffeaaff383709bc9953f9ba0c 100644 --- a/ribbon-core/src/main/java/com/netflix/client/http/HttpRequest.java +++ b/ribbon-core/src/main/java/com/netflix/client/http/HttpRequest.java @@ -70,6 +70,13 @@ public class HttpRequest extends ClientRequest { private HttpRequest request = new HttpRequest(); + public Builder() { + } + + public Builder(HttpRequest request) { + this.request = request; + } + public Builder uri(URI uri) { request.setUri(uri); return this; @@ -109,10 +116,20 @@ public class HttpRequest extends ClientRequest { return this; } + /** + * @deprecated see {@link #queryParam(String, String)} + */ + @Deprecated public Builder queryParams(String name, String value) { request.queryParams.put(name, value); return this; } + + public Builder queryParam(String name, String value) { + request.queryParams.put(name, value); + return this; + } + public Builder entity(Object entity, TypeDef entityType) { request.entity = entity; @@ -186,6 +203,11 @@ public class HttpRequest extends ClientRequest { public static Builder newBuilder() { return new Builder(); } + + public static Builder newBuilder(HttpRequest toCopy) { + return new Builder(toCopy); + } + /** * Return a new instance of HttpRequest replacing the URI. diff --git a/ribbon-core/src/main/java/com/netflix/client/http/HttpResponse.java b/ribbon-core/src/main/java/com/netflix/client/http/HttpResponse.java index 6f62c125cab44a9c2c76b2d44e0cbeeba704e972..fd0101761cb5bdfb62545eb999da4d91a4713e7b 100644 --- a/ribbon-core/src/main/java/com/netflix/client/http/HttpResponse.java +++ b/ribbon-core/src/main/java/com/netflix/client/http/HttpResponse.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Map; import com.netflix.client.ResponseWithTypedEntity; +import com.netflix.serialization.SerializationUtils; import com.netflix.serialization.TypeDef; /** @@ -37,8 +38,14 @@ public interface HttpResponse extends ResponseWithTypedEntity, Closeable { */ public int getStatus(); + /** + * Get the reason phrase of HTTP status + */ public String getStatusLine(); + /** + * @see #getHttpHeaders() + */ @Override @Deprecated public Map> getHeaders(); @@ -49,9 +56,15 @@ public interface HttpResponse extends ResponseWithTypedEntity, Closeable { public InputStream getInputStream(); + /** + * @deprecated See APIs in {@link SerializationUtils} to deserialize HTTP content + */ @Deprecated public T getEntity(Class type) throws Exception; + /** + * @deprecated See APIs in {@link SerializationUtils} to deserialize HTTP content + */ @Deprecated public T getEntity(TypeDef type) throws Exception; } diff --git a/ribbon-core/src/main/java/com/netflix/serialization/JacksonSerializationFactory.java b/ribbon-core/src/main/java/com/netflix/serialization/JacksonSerializationFactory.java index 86c0249ce713f35612b59bd91d20fc38d2a865d8..a0f524b93c3afab137009649ae552fcdb2a141c2 100644 --- a/ribbon-core/src/main/java/com/netflix/serialization/JacksonSerializationFactory.java +++ b/ribbon-core/src/main/java/com/netflix/serialization/JacksonSerializationFactory.java @@ -19,12 +19,10 @@ package com.netflix.serialization; public class JacksonSerializationFactory implements SerializationFactory{ - public static final JacksonCodec instance = JacksonCodec.getInstance(); - @Override public Deserializer getDeserializer(HttpSerializationContext key, TypeDef typeDef) { if (key.getContentType().equalsIgnoreCase("application/json")) { - return instance; + return JacksonCodec.getInstance(); } return null; } @@ -32,7 +30,7 @@ public class JacksonSerializationFactory implements SerializationFactory Serializer getSerializer(HttpSerializationContext key, TypeDef typeDef) { if (key.getContentType().equalsIgnoreCase("application/json")) { - return instance; + return JacksonCodec.getInstance(); } return null; } diff --git a/ribbon-core/src/main/java/com/netflix/serialization/SerializationUtils.java b/ribbon-core/src/main/java/com/netflix/serialization/SerializationUtils.java index 666145a152de0f800787cf465448d5b100d9f8fd..c4cc341caabfe9587087301b4b4811e5cf2477ea 100644 --- a/ribbon-core/src/main/java/com/netflix/serialization/SerializationUtils.java +++ b/ribbon-core/src/main/java/com/netflix/serialization/SerializationUtils.java @@ -1,6 +1,7 @@ package com.netflix.serialization; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import com.netflix.client.config.CommonClientConfigKey; @@ -16,19 +17,29 @@ public class SerializationUtils { return deserializer.deserialize(in, typeDef); } + public static String serializeToString(Serializer serializer, T obj, TypeDef typeDef) + throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + serializer.serialize(out, obj, typeDef); + return new String(out.toByteArray(), "UTF-8"); + } + public static T getEntity(HttpResponse response, TypeDef type, Deserializer deserializer) throws IOException { return deserializer.deserialize(response.getInputStream(), type); } - public static Deserializer getDeserializer(HttpRequest request, HttpHeaders responseHeaders, TypeDef typeDef, SerializationFactory serializationFactory) { + public static Deserializer getDeserializer(HttpRequest request, HttpHeaders responseHeaders, TypeDef typeDef, + SerializationFactory serializationFactory) { Deserializer deserializer = null; if (request.getOverrideConfig() != null) { - deserializer = request.getOverrideConfig().getTypedProperty(CommonClientConfigKey.Deserializer); + deserializer = (Deserializer) request.getOverrideConfig().getPropertyWithType(CommonClientConfigKey.Deserializer); } if (deserializer == null && serializationFactory != null) { deserializer = serializationFactory.getDeserializer(new HttpSerializationContext(responseHeaders, request.getUri()), typeDef); } + if (deserializer == null && typeDef.getRawType().equals(String.class)) { + return (Deserializer) StringDeserializer.getInstance(); + } return deserializer; } - } diff --git a/ribbon-core/src/main/java/com/netflix/serialization/StringDeserializer.java b/ribbon-core/src/main/java/com/netflix/serialization/StringDeserializer.java index ad9210ea5d81697d1040c3639cf508a623336ff5..47c63ce05291c01c3f48e76d065ad0f116d87980 100644 --- a/ribbon-core/src/main/java/com/netflix/serialization/StringDeserializer.java +++ b/ribbon-core/src/main/java/com/netflix/serialization/StringDeserializer.java @@ -9,6 +9,16 @@ import com.google.common.io.CharStreams; import com.google.common.io.Closeables; public class StringDeserializer implements Deserializer { + + private static final StringDeserializer instance = new StringDeserializer(); + + private StringDeserializer() { + } + + public static final StringDeserializer getInstance() { + return instance; + } + @Override public String deserialize(InputStream in, TypeDef type) throws IOException { diff --git a/ribbon-core/src/test/java/com/netflix/client/config/DefaultClientConfigImplTest.java b/ribbon-core/src/test/java/com/netflix/client/config/DefaultClientConfigImplTest.java index 63588ffff6ffbcba1c95d5c30e50ca3c5ca8df49..091bcb2d18724659620c470ba5310619f989ee61 100644 --- a/ribbon-core/src/test/java/com/netflix/client/config/DefaultClientConfigImplTest.java +++ b/ribbon-core/src/test/java/com/netflix/client/config/DefaultClientConfigImplTest.java @@ -22,9 +22,9 @@ public class DefaultClientConfigImplTest { DefaultClientConfigImpl config = new DefaultClientConfigImpl(); config.loadProperties("myclient"); assertEquals("1000", config.getProperty(CommonClientConfigKey.ConnectTimeout)); - assertEquals(1000, config.getTypedProperty(CommonClientConfigKey.ConnectTimeout).intValue()); - config.setTypedProperty(CommonClientConfigKey.ConnectTimeout, 2000); - assertEquals(2000, config.getTypedProperty(CommonClientConfigKey.ConnectTimeout).intValue()); + assertEquals(1000, config.getPropertyWithType(CommonClientConfigKey.ConnectTimeout).intValue()); + config.setPropertyWithType(CommonClientConfigKey.ConnectTimeout, 2000); + assertEquals(2000, config.getPropertyWithType(CommonClientConfigKey.ConnectTimeout).intValue()); } @Test diff --git a/ribbon-httpclient/src/main/java/com/netflix/niws/client/http/HttpClientResponse.java b/ribbon-httpclient/src/main/java/com/netflix/niws/client/http/HttpClientResponse.java index 075f25e04c79cb690428c45b3357e4151a3c68dd..8dc63abe581dee9290bb6a9c137241a3eba8e2b4 100644 --- a/ribbon-httpclient/src/main/java/com/netflix/niws/client/http/HttpClientResponse.java +++ b/ribbon-httpclient/src/main/java/com/netflix/niws/client/http/HttpClientResponse.java @@ -31,10 +31,16 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.netflix.client.ClientException; +import com.netflix.client.config.CommonClientConfigKey; +import com.netflix.client.config.IClientConfig; import com.netflix.client.http.HttpHeaders; +import com.netflix.client.http.HttpRequest; import com.netflix.client.http.HttpResponse; +import com.netflix.serialization.Deserializer; +import com.netflix.serialization.Serializer; import com.netflix.serialization.TypeDef; import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.GenericType; import com.sun.jersey.api.client.UniformInterfaceException; /** @@ -45,15 +51,17 @@ import com.sun.jersey.api.client.UniformInterfaceException; */ class HttpClientResponse implements HttpResponse { - private ClientResponse bcr = null; - - private URI requestedURI; // the request url that got this response - - private Multimap headers = ArrayListMultimap.create(); - private HttpHeaders httpHeaders; - - public HttpClientResponse(ClientResponse cr){ + private final ClientResponse bcr; + + private final Multimap headers = ArrayListMultimap.create(); + private final HttpHeaders httpHeaders; + private final URI requestedURI; + private final IClientConfig overrideConfig; + + public HttpClientResponse(ClientResponse cr, URI requestedURI, IClientConfig config){ bcr = cr; + this.requestedURI = requestedURI; + this.overrideConfig = config; for (Map.Entry> entry: bcr.getHeaders().entrySet()) { if (entry.getKey() != null && entry.getValue() != null) { headers.putAll(entry.getKey(), entry.getValue()); @@ -101,13 +109,7 @@ class HttpClientResponse implements HttpResponse { public T getEntity(Class c) throws Exception { - T t = null; - try { - t = this.bcr.getEntity(c); - } catch (UniformInterfaceException e) { - throw new ClientException(ClientException.ErrorType.GENERAL, e.getMessage(), e.getCause()); - } - return t; + return getEntity(TypeDef.fromClass(c)); } @Override @@ -133,13 +135,9 @@ class HttpClientResponse implements HttpResponse { @Override public URI getRequestedURI() { - return this.requestedURI; + return requestedURI; } - public void setRequestedURI(URI requestedURI) { - this.requestedURI = requestedURI; - } - @Override public Object getPayload() throws ClientException { if (hasEntity()) { @@ -166,7 +164,19 @@ class HttpClientResponse implements HttpResponse { @SuppressWarnings("unchecked") @Override public T getEntity(TypeDef type) throws Exception { - return (T) getEntity(type.getRawType()); + if (overrideConfig != null) { + Deserializer deserializer = overrideConfig.getPropertyWithType(CommonClientConfigKey.Deserializer, null); + if (deserializer != null) { + return (T) deserializer.deserialize(getInputStream(), type); + } + } + T t = null; + try { + t = this.bcr.getEntity(new GenericType(type.getType()){}); + } catch (UniformInterfaceException e) { + throw new ClientException(ClientException.ErrorType.GENERAL, e.getMessage(), e.getCause()); + } + return t; } @Override diff --git a/ribbon-httpclient/src/main/java/com/netflix/niws/client/http/RestClient.java b/ribbon-httpclient/src/main/java/com/netflix/niws/client/http/RestClient.java index 8bec66fae0bcff3eea159aed6467bb867c8ab6d9..c725239071f2d3de810f9bb6b094e4d11b621aeb 100644 --- a/ribbon-httpclient/src/main/java/com/netflix/niws/client/http/RestClient.java +++ b/ribbon-httpclient/src/main/java/com/netflix/niws/client/http/RestClient.java @@ -72,6 +72,9 @@ import com.netflix.niws.cert.AbstractSslContextFactory; import com.netflix.niws.client.ClientSslSocketFactoryException; import com.netflix.niws.client.URLSslContextFactory; import com.netflix.niws.client.http.HttpClientRequest.Verb; +import com.netflix.serialization.SerializationUtils; +import com.netflix.serialization.Serializer; +import com.netflix.serialization.TypeDef; import com.netflix.util.Pair; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientResponse; @@ -507,7 +510,7 @@ public class RestClient extends AbstractLoadBalancerAwareClient> headers, Map> params, - IClientConfig overriddenClientConfig, Object requestEntity) throws Exception { + IClientConfig overriddenClientConfig, Object requestEntity, TypeDef entityType) throws Exception { HttpClientResponse thisResponse = null; boolean bbFollowRedirects = bFollowRedirects; // read overriden props @@ -597,18 +600,28 @@ public class RestClient extends AbstractLoadBalancerAwareClient extends MessageToMessageDecoder out) throws Exception { int statusCode = msg.getStatus().code(); if (type.getRawType().isAssignableFrom(HttpResponse.class)) { - msg.content().retain(); out.add(new NettyHttpResponse(msg, msg.content(), serializationFactory, request)); } else if (statusCode >= 200 && statusCode < 300) { if (msg.content().isReadable()) { diff --git a/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/NettyHttpResponse.java b/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/NettyHttpResponse.java index fe885b5866e577b514410e41277fe2747ca2dd80..6a2bccfbe1d402b097bd6629535d9193e542c86b 100644 --- a/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/NettyHttpResponse.java +++ b/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/NettyHttpResponse.java @@ -8,9 +8,7 @@ import io.netty.handler.codec.http.HttpResponseStatus; import java.io.InputStream; import java.net.URI; import java.util.Collection; -import java.util.List; import java.util.Map; -import java.util.Map.Entry; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; @@ -33,6 +31,9 @@ class NettyHttpResponse implements ResponseWithTypedEntity, com.netflix.client.h public NettyHttpResponse(HttpResponse response, ByteBuf content, SerializationFactory serializationFactory, HttpRequest request) { this.response = response; this.content = content; + if (content != null) { + content.retain(); + } this.serializationFactory = serializationFactory; this.request = request; } diff --git a/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/RxNettyHttpClient.java b/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/RxNettyHttpClient.java index a99186dd482863c4579401e3946bd1497bbc7d9b..cfebb6908eb866e864734ded72eb637da1b198af 100644 --- a/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/RxNettyHttpClient.java +++ b/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/RxNettyHttpClient.java @@ -1,5 +1,6 @@ package com.netflix.client.netty.http; +import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelOption; @@ -32,9 +33,12 @@ import com.netflix.client.ClientException; import com.netflix.client.config.CommonClientConfigKey; import com.netflix.client.config.DefaultClientConfigImpl; import com.netflix.client.config.IClientConfig; +import com.netflix.client.config.IClientConfigKey; import com.netflix.client.http.HttpRequest; import com.netflix.client.http.HttpResponse; import com.netflix.client.http.UnexpectedHttpResponseException; +import com.netflix.client.http.HttpRequest.Builder; +import com.netflix.client.http.HttpRequest.Verb; import com.netflix.serialization.Deserializer; import com.netflix.serialization.HttpSerializationContext; import com.netflix.serialization.JacksonSerializationFactory; @@ -52,45 +56,47 @@ public class RxNettyHttpClient { private IClientConfig config; public RxNettyHttpClient() { - this(DefaultClientConfigImpl.getClientConfigWithDefaultValues(), new JacksonSerializationFactory()); + this(DefaultClientConfigImpl.getClientConfigWithDefaultValues(), new JacksonSerializationFactory(), + new Bootstrap().group(new NioEventLoopGroup())); } public RxNettyHttpClient(IClientConfig config) { - this(config, new JacksonSerializationFactory()); + this(config, new JacksonSerializationFactory(), new Bootstrap().group(new NioEventLoopGroup())); } - public RxNettyHttpClient(IClientConfig config, SerializationFactory serializationFactory) { + public RxNettyHttpClient(IClientConfig config, SerializationFactory serializationFactory, + Bootstrap bootStrap) { this.config = config; this.connectTimeout = config.getPropertyAsInteger(CommonClientConfigKey.ConnectTimeout, DefaultClientConfigImpl.DEFAULT_CONNECT_TIMEOUT); this.readTimeout = config.getPropertyAsInteger(CommonClientConfigKey.ReadTimeout, DefaultClientConfigImpl.DEFAULT_READ_TIMEOUT); this.serializationFactory = serializationFactory; this.observableClient = ObservableHttpClient.newBuilder() .withChannelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout) - .build(new NioEventLoopGroup()); + .build(bootStrap.group()); } private class SingleEntityHandler extends HttpProtocolHandlerAdapter { - private HttpEntityDecoder decoder; private HttpRequest request; + private TypeDef typeDef; private SingleEntityHandler(HttpRequest request, SerializationFactory serializationFactory, TypeDef typeDef) { - decoder = new HttpEntityDecoder(serializationFactory, request, typeDef); this.request = request; + this.typeDef = typeDef; } @Override public void configure(ChannelPipeline pipeline) { int timeout = readTimeout; if (request.getOverrideConfig() != null) { - Integer overrideTimeout = request.getOverrideConfig().getTypedProperty(CommonClientConfigKey.ReadTimeout); + Integer overrideTimeout = request.getOverrideConfig().getPropertyWithType(CommonClientConfigKey.ReadTimeout); if (overrideTimeout != null) { timeout = overrideTimeout.intValue(); } } pipeline.addAfter("http-response-decoder", "http-aggregator", new HttpObjectAggregator(Integer.MAX_VALUE)); pipeline.addAfter("http-aggregator", SelfRemovingResponseTimeoutHandler.NAME, new SelfRemovingResponseTimeoutHandler(timeout, TimeUnit.MILLISECONDS)); - pipeline.addAfter(SelfRemovingResponseTimeoutHandler.NAME, "entity-decoder", decoder); + pipeline.addAfter(SelfRemovingResponseTimeoutHandler.NAME, "entity-decoder", new HttpEntityDecoder(serializationFactory, request, typeDef)); } } @@ -110,27 +116,36 @@ public class RxNettyHttpClient { uri = encoder.toString(); } if (entity != null) { - Serializer serializer = null; - if (request.getOverrideConfig() != null) { - serializer = request.getOverrideConfig().getTypedProperty(CommonClientConfigKey.Serializer); - } - if (serializer == null) { - HttpSerializationContext key = new HttpSerializationContext(request.getHttpHeaders(), request.getUri()); - serializer = serializationFactory.getSerializer(key, request.getEntityType()); - } - if (serializer == null) { - throw new ClientException("Unable to find serializer"); - } - ByteArrayOutputStream bout = new ByteArrayOutputStream(); - try { - serializer.serialize(bout, entity, request.getEntityType()); - } catch (IOException e) { - throw new ClientException("Error serializing entity in request", e); + ByteBuf buf = null; + int contentLength = -1; + if (entity instanceof ByteBuf) { + buf = (ByteBuf) entity; + } else { + Serializer serializer = null; + if (request.getOverrideConfig() != null) { + serializer = request.getOverrideConfig().getPropertyWithType(CommonClientConfigKey.Serializer); + } + if (serializer == null) { + HttpSerializationContext key = new HttpSerializationContext(request.getHttpHeaders(), request.getUri()); + serializer = serializationFactory.getSerializer(key, request.getEntityType()); + } + if (serializer == null) { + throw new ClientException("Unable to find serializer"); + } + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + try { + serializer.serialize(bout, entity, request.getEntityType()); + } catch (IOException e) { + throw new ClientException("Error serializing entity in request", e); + } + byte[] content = bout.toByteArray(); + buf = Unpooled.wrappedBuffer(content); + contentLength = content.length; } - byte[] content = bout.toByteArray(); - ByteBuf buf = Unpooled.wrappedBuffer(content); r = new ValidatedFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(request.getVerb().name()), uri, buf); - r.headers().set(HttpHeaders.Names.CONTENT_LENGTH, content.length); + if (contentLength >= 0) { + r.headers().set(HttpHeaders.Names.CONTENT_LENGTH, contentLength); + } } else { r = new ValidatedFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(request.getVerb().name()), uri); } @@ -149,6 +164,10 @@ public class RxNettyHttpClient { return config; } + public SerializationFactory getSerializationFactory() { + return serializationFactory; + } + public Observable> createServerSentEventEntityObservable(final HttpRequest request, final TypeDef typeDef) { return createServerSentEventObservable(request) .flatMap(new Func1, Observable>>() { @@ -195,15 +214,63 @@ public class RxNettyHttpClient { }); } + private T getProperty(IClientConfigKey key, IClientConfig overrideConfig) { + T value = null; + if (overrideConfig != null) { + value = overrideConfig.getPropertyWithType(key); + } + if (value == null) { + value = getConfig().getPropertyWithType(key); + } + return value; + } + + private static class RedirectException extends RuntimeException { + public RedirectException(String message) { + super(message); + } + } - public Observable> createObservableHttpResponse(final HttpRequest request, HttpProtocolHandler protocolHandler) { + public Observable> createObservableHttpResponse(final HttpRequest request, final HttpProtocolHandler protocolHandler) { ValidatedFullHttpRequest r = null; try { r = getHttpRequest(request); } catch (final Exception e) { return Observable.error(e); } - return observableClient.execute(r, protocolHandler); + final Observable> observable = observableClient.execute(r, protocolHandler); + Boolean followRedirect = getProperty(CommonClientConfigKey.FollowRedirects, request.getOverrideConfig()); + if (followRedirect != null && followRedirect.booleanValue()) { + return observable.flatMap(new Func1, Observable>>() { + @Override + public Observable> call( + ObservableHttpResponse t1) { + int statusCode = t1.response().getStatus().code(); + switch (statusCode) { + case 301: + case 302: + case 303: + case 307: + case 308: + String location = t1.response().headers().get("Location"); + if (location == null) { + return Observable.error(new Exception("Location header is not set in the redirect response")); + } + + Builder builder = HttpRequest.newBuilder(request).uri(location); + if (statusCode == 303) { + // according to the spec, this must be done with GET + builder.verb(Verb.GET); + } + Observable> newObservable = createObservableHttpResponse(builder.build(), protocolHandler); + return newObservable; + default: break; + } + return Observable.from(t1); + } + }); + } else { + return observable; + } } - } diff --git a/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/RxNettyHttpLoadBalancingClient.java b/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/RxNettyHttpLoadBalancingClient.java index 8169e29d94ec83d0e665562edd6ece35dc094251..000b77378eba82685d4d3314cff4202add94f70c 100644 --- a/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/RxNettyHttpLoadBalancingClient.java +++ b/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/RxNettyHttpLoadBalancingClient.java @@ -1,5 +1,6 @@ package com.netflix.client.netty.http; +import io.netty.bootstrap.Bootstrap; import rx.Observable; import rx.netty.protocol.http.HttpProtocolHandler; import rx.netty.protocol.http.ObservableHttpResponse; @@ -7,28 +8,38 @@ import rx.netty.protocol.http.ObservableHttpResponse; import com.netflix.client.LoadBalancerErrorHandler; import com.netflix.client.LoadBalancerObservableRequest; import com.netflix.client.LoadBalancerObservables; +import com.netflix.client.config.DefaultClientConfigImpl; import com.netflix.client.config.IClientConfig; import com.netflix.client.http.HttpRequest; import com.netflix.client.http.HttpResponse; import com.netflix.loadbalancer.ILoadBalancer; import com.netflix.loadbalancer.Server; import com.netflix.loadbalancer.ServerStats; +import com.netflix.serialization.HttpSerializationContext; +import com.netflix.serialization.JacksonSerializationFactory; +import com.netflix.serialization.SerializationFactory; public class RxNettyHttpLoadBalancingClient extends RxNettyHttpClient { private LoadBalancerObservables lbObservables; - public RxNettyHttpLoadBalancingClient(IClientConfig config, LoadBalancerErrorHandler errorHandler) { + public RxNettyHttpLoadBalancingClient() { + this(DefaultClientConfigImpl.getClientConfigWithDefaultValues()); + } + + public RxNettyHttpLoadBalancingClient(IClientConfig config) { super(config); + lbObservables = new LoadBalancerObservables(config); + lbObservables.setErrorHandler(new NettyHttpLoadBalancerErrorHandler()); + } + + public RxNettyHttpLoadBalancingClient(IClientConfig config, LoadBalancerErrorHandler errorHandler, + SerializationFactory serializationFactory, Bootstrap bootStrap) { + super(config, serializationFactory, bootStrap); this.lbObservables = new LoadBalancerObservables(config); lbObservables.setErrorHandler(errorHandler); } - public RxNettyHttpLoadBalancingClient(RxNettyHttpClient client) { - this.lbObservables = new LoadBalancerObservables(client.getConfig()); - lbObservables.setErrorHandler(new NettyHttpLoadBalancerErrorHandler()); - } - private Observable> executeSuper(final HttpRequest request, HttpProtocolHandler protocolHandler) { return super.createObservableHttpResponse(request, protocolHandler); } diff --git a/ribbon-rxnetty/src/test/java/com/netflix/client/netty/NettyClientTest.java b/ribbon-rxnetty/src/test/java/com/netflix/client/netty/NettyClientTest.java index 6c360622bdfd8254bf17444c8952950feaa7fe47..9d5de151bda1bce508f6aa462a93ddada563b267 100644 --- a/ribbon-rxnetty/src/test/java/com/netflix/client/netty/NettyClientTest.java +++ b/ribbon-rxnetty/src/test/java/com/netflix/client/netty/NettyClientTest.java @@ -17,6 +17,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; import org.junit.BeforeClass; import org.junit.Test; @@ -25,6 +27,8 @@ import rx.util.functions.Action1; import rx.util.functions.Func1; import com.google.common.collect.Lists; +import com.google.mockwebserver.MockResponse; +import com.google.mockwebserver.MockWebServer; import com.netflix.client.ClientException; import com.netflix.client.LoadBalancerObservables; import com.netflix.client.config.CommonClientConfigKey; @@ -64,7 +68,7 @@ public class NettyClientTest { PackagesResourceConfig resourceConfig = new PackagesResourceConfig("com.netflix.ribbon.test.resources"); port = (new Random()).nextInt(1000) + 4000; SERVICE_URI = "http://localhost:" + port + "/"; - ExecutorService service = Executors.newFixedThreadPool(200); + ExecutorService service = Executors.newFixedThreadPool(20); try{ server = HttpServerFactory.create(SERVICE_URI, resourceConfig); server.setExecutor(service); @@ -73,6 +77,7 @@ public class NettyClientTest { e.printStackTrace(); fail("Unable to start server"); } + // LogManager.getRootLogger().setLevel((Level)Level.DEBUG); } @Test @@ -93,12 +98,40 @@ public class NettyClientTest { }); assertEquals(Lists.newArrayList(EmbeddedResources.defaultPerson), result); } - + + @Test + public void testRedirect() throws Exception { + URI uri = new URI(SERVICE_URI + "testAsync/redirect"); + HttpRequest request = HttpRequest.newBuilder().uri(uri).queryParam("port", String.valueOf(port)).build(); + RxNettyHttpClient observableClient = new RxNettyHttpClient(DefaultClientConfigImpl.getClientConfigWithDefaultValues().setPropertyWithType(CommonClientConfigKey.ReadTimeout, 1000000)); + final List result = Lists.newArrayList(); + observableClient.createEntityObservable(request, TypeDef.fromClass(Person.class)).subscribe(new Action1() { + @Override + public void call(Person t1) { + try { + result.add(t1); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, new Action1() { + + @Override + public void call(Throwable t1) { + t1.printStackTrace(); + } + + } + ); + Thread.sleep(2000); + assertEquals(Lists.newArrayList(EmbeddedResources.defaultPerson), result); + } + @Test public void testWithOverrideDeserializer() throws Exception { URI uri = new URI(SERVICE_URI + "testAsync/person"); DefaultClientConfigImpl overrideConfig = new DefaultClientConfigImpl(); - overrideConfig.setTypedProperty(CommonClientConfigKey.Deserializer, new StringDeserializer()); + overrideConfig.setPropertyWithType(CommonClientConfigKey.Deserializer, StringDeserializer.getInstance()); HttpRequest request = HttpRequest.newBuilder().uri(uri).overrideConfig(overrideConfig).build(); RxNettyHttpClient observableClient = new RxNettyHttpClient(); final List result = Lists.newArrayList(); @@ -261,11 +294,10 @@ public class NettyClientTest { @Test public void testObservableWithMultipleServers() throws Exception { IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ConnectTimeout, "1000"); - RxNettyHttpClient observableClient = new RxNettyHttpClient(config); URI uri = new URI("/testAsync/person"); HttpRequest request = HttpRequest.newBuilder().uri(uri).build(); - RxNettyHttpLoadBalancingClient lbObservables = new RxNettyHttpLoadBalancingClient(observableClient); + RxNettyHttpLoadBalancingClient lbObservables = new RxNettyHttpLoadBalancingClient(config); BaseLoadBalancer lb = new BaseLoadBalancer(new DummyPing(), new AvailabilityFilteringRule()); Server badServer = new Server("localhost:12345"); Server goodServer = new Server("localhost:" + port); @@ -292,14 +324,79 @@ public class NettyClientTest { assertEquals(0, stats.getSuccessiveConnectionFailureCount()); } + @Test + public void testHttpResponseObservableWithMultipleServers() throws Exception { + IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ConnectTimeout, "1000"); + URI uri = new URI("/testAsync/person"); + HttpRequest request = HttpRequest.newBuilder().uri(uri).build(); + + RxNettyHttpLoadBalancingClient lbObservables = new RxNettyHttpLoadBalancingClient(config); + BaseLoadBalancer lb = new BaseLoadBalancer(new DummyPing(), new AvailabilityFilteringRule()); + Server badServer = new Server("localhost:12345"); + Server goodServer = new Server("localhost:" + port); + List servers = Lists.newArrayList(badServer, badServer, badServer, goodServer); + lb.setServersList(servers); + lbObservables.setLoadBalancer(lb); + lbObservables.setMaxAutoRetries(1); + lbObservables.setMaxAutoRetriesNextServer(3); + Observable observableWithRetries = lbObservables.createFullHttpResponseObservable(request); + ObserverWithLatch observer = new ObserverWithLatch(); + observableWithRetries.subscribe(observer); + observer.await(); + assertEquals(200, observer.obj.getStatus()); + } + + + @Test + public void testLoadBalancingObservablesWithReadTimeout() throws Exception { + MockWebServer server = new MockWebServer(); + String content = "{\"name\": \"ribbon\", \"age\": 2}"; + server.enqueue(new MockResponse().setResponseCode(200).setHeader("Content-type", "application/json") + .setBody(content)); + server.play(); + + IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues() + .setPropertyWithType(CommonClientConfigKey.ReadTimeout, 100); + URI uri = new URI("/testAsync/readTimeout"); + HttpRequest request = HttpRequest.newBuilder().uri(uri).build(); + + RxNettyHttpLoadBalancingClient lbObservables = new RxNettyHttpLoadBalancingClient(config); + BaseLoadBalancer lb = new BaseLoadBalancer(new DummyPing(), new AvailabilityFilteringRule()); + Server goodServer = new Server("localhost:" + server.getPort()); + Server badServer = new Server("localhost:" + port); + List servers = Lists.newArrayList(goodServer, badServer, badServer, goodServer); + lb.setServersList(servers); + lbObservables.setLoadBalancer(lb); + lbObservables.setMaxAutoRetries(1); + lbObservables.setMaxAutoRetriesNextServer(3); + Observable observableWithRetries = lbObservables.createEntityObservable(request, TypeDef.fromClass(Person.class)); + ObserverWithLatch observer = new ObserverWithLatch(); + observableWithRetries.subscribe(observer); + observer.await(); + assertEquals("ribbon", observer.obj.name); + assertEquals(2, observer.obj.age); + ServerStats stats = lbObservables.getServerStats(badServer); + server.shutdown(); + // two requests to bad server because retry same server is set to 1 + assertEquals(4, stats.getTotalRequestsCount()); + assertEquals(0, stats.getActiveRequestsCount()); + assertEquals(4, stats.getSuccessiveConnectionFailureCount()); + + stats = lbObservables.getServerStats(goodServer); + // two requests to bad server because retry same server is set to 1 + assertEquals(1, stats.getTotalRequestsCount()); + assertEquals(0, stats.getActiveRequestsCount()); + assertEquals(0, stats.getSuccessiveConnectionFailureCount()); + } + + @Test public void testObservableWithMultipleServersFailed() throws Exception { IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ConnectTimeout, "1000"); - RxNettyHttpClient observableClient = new RxNettyHttpClient(config); URI uri = new URI("/testAsync/person"); HttpRequest request = HttpRequest.newBuilder().uri(uri).build(); - RxNettyHttpLoadBalancingClient lbObservables = new RxNettyHttpLoadBalancingClient(observableClient); + RxNettyHttpLoadBalancingClient lbObservables = new RxNettyHttpLoadBalancingClient(config); BaseLoadBalancer lb = new BaseLoadBalancer(new DummyPing(), new AvailabilityFilteringRule()); Server badServer = new Server("localhost:12345"); Server badServer1 = new Server("localhost:12346"); @@ -326,7 +423,7 @@ public class NettyClientTest { @Test public void testStream() throws Exception { HttpRequest request = HttpRequest.newBuilder().uri(SERVICE_URI + "testAsync/personStream") - .overrideConfig(new DefaultClientConfigImpl().setTypedProperty(CommonClientConfigKey.Deserializer, JacksonCodec.getInstance())) + .overrideConfig(new DefaultClientConfigImpl().setPropertyWithType(CommonClientConfigKey.Deserializer, JacksonCodec.getInstance())) .build(); RxNettyHttpClient observableClient = new RxNettyHttpClient(); final List result = Lists.newArrayList(); @@ -339,6 +436,32 @@ public class NettyClientTest { assertEquals(EmbeddedResources.entityStream, result); } + @Test + public void testStreamWithLoadBalancer() throws Exception { + IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ConnectTimeout, "1000"); + RxNettyHttpLoadBalancingClient lbObservables = new RxNettyHttpLoadBalancingClient(config); + HttpRequest request = HttpRequest.newBuilder().uri("/testAsync/personStream") + .overrideConfig(new DefaultClientConfigImpl().setPropertyWithType(CommonClientConfigKey.Deserializer, JacksonCodec.getInstance())) + .build(); + final List result = Lists.newArrayList(); + BaseLoadBalancer lb = new BaseLoadBalancer(new DummyPing(), new AvailabilityFilteringRule()); + Server goodServer = new Server("localhost:" + port); + Server badServer = new Server("localhost:12245"); + List servers = Lists.newArrayList(badServer, badServer, badServer, goodServer); + lb.setServersList(servers); + lbObservables.setLoadBalancer(lb); + lbObservables.setMaxAutoRetries(1); + lbObservables.setMaxAutoRetriesNextServer(3); + + lbObservables.createServerSentEventEntityObservable(request, TypeDef.fromClass(Person.class)).toBlockingObservable().forEach(new Action1>() { + @Override + public void call(ServerSentEvent t1) { + result.add(t1.getEntity()); + } + }); + assertEquals(EmbeddedResources.entityStream, result); + } + @Test public void testNoEntity() throws Exception { @@ -403,6 +526,29 @@ public class NettyClientTest { assertNull(p); assertTrue(throwable.get() instanceof UnexpectedHttpResponseException); assertEquals("Service Unavailable", throwable.get().getMessage()); - assertEquals(503, ((UnexpectedHttpResponseException) throwable.get()).getStatusCode()); + UnexpectedHttpResponseException ex = (UnexpectedHttpResponseException) throwable.get(); + assertEquals(503, ex.getStatusCode()); + String body = ex.getResponse().getEntity(String.class); + assertEquals("Rate exceeds limit", body); + } + + @Test + public void testUnexpectedResponse() throws Exception { + URI uri = new URI(SERVICE_URI + "testAsync/throttle"); + HttpRequest request = HttpRequest.newBuilder().uri(uri).build(); + RxNettyHttpClient client = new RxNettyHttpClient(); + Observable responseObservable = client.createFullHttpResponseObservable(request); + final AtomicReference response = new AtomicReference(); + responseObservable.toBlockingObservable().forEach(new Action1() { + + @Override + public void call(HttpResponse t1) { + response.set(t1); + } + + }); + assertEquals(503, response.get().getStatus()); + String body = response.get().getEntity(String.class); + assertEquals("Rate exceeds limit", body); } } diff --git a/ribbon-test/src/main/java/com/netflix/ribbon/test/resources/EmbeddedResources.java b/ribbon-test/src/main/java/com/netflix/ribbon/test/resources/EmbeddedResources.java index 4791297d1218f81a43ac40d512efade7c694ea67..e0f49fb3c1841ff00bae5de853240534051dea7a 100644 --- a/ribbon-test/src/main/java/com/netflix/ribbon/test/resources/EmbeddedResources.java +++ b/ribbon-test/src/main/java/com/netflix/ribbon/test/resources/EmbeddedResources.java @@ -140,7 +140,7 @@ public class EmbeddedResources { @GET @Path("/throttle") public Response throttle() { - return Response.status(Response.Status.SERVICE_UNAVAILABLE).build(); + return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("Rate exceeds limit").build(); } @GET @@ -160,6 +160,12 @@ public class EmbeddedResources { }; } + @GET + @Path("/redirect") + public Response redirect(@QueryParam("port") int port) { + return Response.status(301).header("Location", "http://localhost:" + port + "/testAsync/person").build(); + } + @GET @Path("/personStream") @Produces("text/event-stream")