diff --git a/ribbon-loadbalancer/src/main/java/com/netflix/client/AbstractLoadBalancerAwareClient.java b/ribbon-loadbalancer/src/main/java/com/netflix/client/AbstractLoadBalancerAwareClient.java index be8d1ad2eb4dc8725b8d205d1dcd715fe2b144cd..018b410caf6228d6d8de1f9cec0dd1b60f64dcde 100644 --- a/ribbon-loadbalancer/src/main/java/com/netflix/client/AbstractLoadBalancerAwareClient.java +++ b/ribbon-loadbalancer/src/main/java/com/netflix/client/AbstractLoadBalancerAwareClient.java @@ -145,7 +145,7 @@ extends LoadBalancerExecutor implements IClient, IClientConfigAware { RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig); try { - return RxUtils.getSingleValueWithRealErrorCause(retryWithLoadBalancer(request.getUri(), toObsevableProvider(callableProvider), handler, request.getLoadBalancerKey())); + return RxUtils.getSingleValueWithRealErrorCause(retryWithLoadBalancer(toObsevableProvider(callableProvider), request.getUri(), handler, request.getLoadBalancerKey())); } catch (Exception e) { if (e instanceof ClientException) { throw (ClientException) e; diff --git a/ribbon-loadbalancer/src/main/java/com/netflix/client/ClientCallableProvider.java b/ribbon-loadbalancer/src/main/java/com/netflix/client/ClientCallableProvider.java index f287c73baf1391466cc58057781430a9cd4b537b..1fd8e5b4220163527ec7a0c696f14c625c8aa184 100644 --- a/ribbon-loadbalancer/src/main/java/com/netflix/client/ClientCallableProvider.java +++ b/ribbon-loadbalancer/src/main/java/com/netflix/client/ClientCallableProvider.java @@ -1,6 +1,5 @@ package com.netflix.client; -import java.util.concurrent.Callable; import com.netflix.loadbalancer.Server; diff --git a/ribbon-loadbalancer/src/main/java/com/netflix/client/LoadBalancerContext.java b/ribbon-loadbalancer/src/main/java/com/netflix/client/LoadBalancerContext.java index d32e2a6c40f335d9c03befff7a4d484ed9961046..67f878068457b771cc78e85651caeabf24032bc5 100644 --- a/ribbon-loadbalancer/src/main/java/com/netflix/client/LoadBalancerContext.java +++ b/ribbon-loadbalancer/src/main/java/com/netflix/client/LoadBalancerContext.java @@ -439,9 +439,7 @@ public class LoadBalancerContext implements IClientConfigAware { *
  • if host is missing but none of the above applies, throws ClientException * * @param original Original URI passed from caller - * @return new request with the final URI */ - @SuppressWarnings("unchecked") protected Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException { String host = null; int port = -1; diff --git a/ribbon-loadbalancer/src/main/java/com/netflix/client/LoadBalancerExecutor.java b/ribbon-loadbalancer/src/main/java/com/netflix/client/LoadBalancerExecutor.java index a094542377082aad321578dcf661f28b2394b168..474131bd39d8e3bb506be3187a637a3b1cbf7830 100644 --- a/ribbon-loadbalancer/src/main/java/com/netflix/client/LoadBalancerExecutor.java +++ b/ribbon-loadbalancer/src/main/java/com/netflix/client/LoadBalancerExecutor.java @@ -144,15 +144,42 @@ public class LoadBalancerExecutor extends LoadBalancerContext { } - public T retryWithLoadBalancer(final URI loadBalancerURI, final ClientCallableProvider clientCallableProvider, - @Nullable final RetryHandler errorHandler, @Nullable final Object loadBalancerKey) throws Exception { + /** + * Retry execution with load balancer with the given {@link ClientCallableProvider} that provides the logic to + * execute network call synchronously with a given {@link Server}. + * + * @param clientCallableProvider interface that provides the logic to execute network call synchronously with a given {@link Server} + * @param loadBalancerURI An optional URI that may contain a real host name and port to use as a fallback to the {@link LoadBalancerExecutor} + * if it does not have a load balancer or cannot find a server from its server list. For example, the URI contains + * "www.google.com:80" will force the {@link LoadBalancerExecutor} to use www.google.com:80 as the actual server to + * carry out the retry execution. See {@link LoadBalancerContext#getServerFromLoadBalancer(URI, Object)} + * @param retryHandler an optional handler to determine the retry logic of the {@link LoadBalancerExecutor}. If null, the default {@link RetryHandler} + * of this {@link LoadBalancerExecutor} will be used. + * @param loadBalancerKey An optional key passed to the load balancer to determine which server to return. + * @throws Exception If any exception happens in the exception + */ + public T retryWithLoadBalancer(final ClientCallableProvider clientCallableProvider, @Nullable final URI loadBalancerURI, + @Nullable final RetryHandler retryHandler, @Nullable final Object loadBalancerKey) throws Exception { return RxUtils.getSingleValueWithRealErrorCause( - retryWithLoadBalancer(loadBalancerURI, CallableToObservable.toObsevableProvider(clientCallableProvider), - errorHandler, loadBalancerKey)); + retryWithLoadBalancer(CallableToObservable.toObsevableProvider(clientCallableProvider), loadBalancerURI, + retryHandler, loadBalancerKey)); } - public Observable retryWithLoadBalancer(final URI loadBalancerURI, final ClientObservableProvider clientObservableProvider, - @Nullable final RetryHandler errorHandler, @Nullable final Object loadBalancerKey) { + /** + * Create an {@link Observable} that retries execution with load balancer with the given {@link ClientObservableProvider} that provides the logic to + * execute network call asynchronously with a given {@link Server}. + * + * @param clientObservableProvider interface that provides the logic to execute network call asynchronously with a given {@link Server} + * @param loadBalancerURI An optional URI that may contain a real host name and port to be used by {@link LoadBalancerExecutor} + * if it does not have a load balancer or cannot find a server from its server list. For example, the URI contains + * "www.google.com:80" will force the {@link LoadBalancerExecutor} to use www.google.com:80 as the actual server to + * carry out the retry execution. See {@link LoadBalancerContext#getServerFromLoadBalancer(URI, Object)} + * @param retryHandler an optional handler to determine the retry logic of the {@link LoadBalancerExecutor}. If null, the default {@link RetryHandler} + * of this {@link LoadBalancerExecutor} will be used. + * @param loadBalancerKey An optional key passed to the load balancer to determine which server to return. + */ + public Observable retryWithLoadBalancer(final ClientObservableProvider clientObservableProvider, @Nullable final URI loadBalancerURI, + @Nullable final RetryHandler retryHandler, @Nullable final Object loadBalancerKey) { OnSubscribeFunc onSubscribe = new OnSubscribeFunc() { @Override public Subscription onSubscribe(final Observer t1) { @@ -164,15 +191,15 @@ public class LoadBalancerExecutor extends LoadBalancerContext { t1.onError(e); return Subscriptions.empty(); } - return retrySameServer(server, clientObservableProvider, errorHandler).subscribe(t1); + return retrySameServer(server, clientObservableProvider, retryHandler).subscribe(t1); } }; Observable observable = Observable.create(onSubscribe); - RetryNextServerFunc retryNextServerFunc = new RetryNextServerFunc(loadBalancerURI, onSubscribe, errorHandler); + RetryNextServerFunc retryNextServerFunc = new RetryNextServerFunc(loadBalancerURI, onSubscribe, retryHandler); return observable.onErrorResumeNext(retryNextServerFunc); } - public Observable retrySameServer(final Server server, final ClientObservableProvider clientObservableProvider, final RetryHandler errorHandler) { + protected Observable retrySameServer(final Server server, final ClientObservableProvider clientObservableProvider, final RetryHandler errorHandler) { final ServerStats serverStats = getServerStats(server); OnSubscribeFunc onSubscribe = new OnSubscribeFunc() { @Override diff --git a/ribbon-loadbalancer/src/main/java/com/netflix/utils/RxUtils.java b/ribbon-loadbalancer/src/main/java/com/netflix/utils/RxUtils.java index d422a6c882562db2d9b736dea6148b93e6fa425f..3e2ef6be8de492667a93e2ef8213904cb4ac2583 100644 --- a/ribbon-loadbalancer/src/main/java/com/netflix/utils/RxUtils.java +++ b/ribbon-loadbalancer/src/main/java/com/netflix/utils/RxUtils.java @@ -10,7 +10,7 @@ public class RxUtils { @Override public Observable call(Throwable t1) { - if (t1 instanceof RuntimeException) { + if ((t1 instanceof RuntimeException) && t1.getCause() != null) { return Observable.error(t1.getCause()); } else { return Observable.error(t1); diff --git a/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/NettyHttpLoadBalancingClient.java b/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/NettyHttpLoadBalancingClient.java index c3f18193ad8b51643eea519578bcaa0a8a5ab389..388744d38431599047f6b302e48d86b7a6a19a54 100644 --- a/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/NettyHttpLoadBalancingClient.java +++ b/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/NettyHttpLoadBalancingClient.java @@ -89,33 +89,33 @@ public class NettyHttpLoadBalancingClient extends NettyHttpClient { @Override public Observable> createServerSentEventEntityObservable( final HttpRequest request, final TypeDef typeDef, final IClientConfig requestConfig) { - return lbObservables.retryWithLoadBalancer(request.getUri(), new ClientObservableProvider>() { + return lbObservables.retryWithLoadBalancer(new ClientObservableProvider>() { @Override public Observable> getObservableForEndpoint(Server server) { return delegate.createServerSentEventEntityObservable(createRequest(server, request), typeDef, requestConfig); } - }, new HttpRequestRetryHandler(request, requestConfig, lbObservables.getErrorHandler()), request.getLoadBalancerKey()); + }, request.getUri(), new HttpRequestRetryHandler(request, requestConfig, lbObservables.getErrorHandler()), request.getLoadBalancerKey()); } @Override public Observable> createServerSentEventObservable( final HttpRequest request, final IClientConfig requestConfig) { - return lbObservables.retryWithLoadBalancer(request.getUri(), new ClientObservableProvider>() { + return lbObservables.retryWithLoadBalancer(new ClientObservableProvider>() { @Override public Observable> getObservableForEndpoint(Server server) { return delegate.createServerSentEventObservable(createRequest(server, request), requestConfig); } - }, new HttpRequestRetryHandler(request, requestConfig, lbObservables.getErrorHandler()), request.getLoadBalancerKey()); + }, request.getUri(), new HttpRequestRetryHandler(request, requestConfig, lbObservables.getErrorHandler()), request.getLoadBalancerKey()); } @Override public Observable createFullHttpResponseObservable( final HttpRequest request, final IClientConfig requestConfig) { - return lbObservables.retryWithLoadBalancer(request.getUri(), new ClientObservableProvider() { + return lbObservables.retryWithLoadBalancer(new ClientObservableProvider() { @Override public Observable getObservableForEndpoint( @@ -123,20 +123,20 @@ public class NettyHttpLoadBalancingClient extends NettyHttpClient { return delegate.createFullHttpResponseObservable(createRequest(server, request), requestConfig); } - }, new HttpRequestRetryHandler(request, requestConfig, lbObservables.getErrorHandler()), request.getLoadBalancerKey()); + }, request.getUri(), new HttpRequestRetryHandler(request, requestConfig, lbObservables.getErrorHandler()), request.getLoadBalancerKey()); } public Observable createEntityObservable(final HttpRequest request, final TypeDef typeDef, final IClientConfig requestConfig, @Nullable final RetryHandler retryHandler) { final RetryHandler handler = retryHandler == null ? new HttpRequestRetryHandler(request, requestConfig, lbObservables.getErrorHandler()) : retryHandler; - return lbObservables.retryWithLoadBalancer(request.getUri(), new ClientObservableProvider() { + return lbObservables.retryWithLoadBalancer(new ClientObservableProvider() { @Override public Observable getObservableForEndpoint(Server server) { return delegate.createEntityObservable(createRequest(server, request), typeDef, requestConfig); } - }, handler, request.getLoadBalancerKey()); + }, request.getUri(), handler, request.getLoadBalancerKey()); } diff --git a/ribbon-rxnetty/src/test/java/com/netflix/client/netty/NettyClientTest.java b/ribbon-rxnetty/src/test/java/com/netflix/client/netty/NettyClientTest.java index 5d68d315cf1d19358af9f25c96f7bda7ef427828..525d1fbc01627ed881036932f30ac10528afe201 100644 --- a/ribbon-rxnetty/src/test/java/com/netflix/client/netty/NettyClientTest.java +++ b/ribbon-rxnetty/src/test/java/com/netflix/client/netty/NettyClientTest.java @@ -328,61 +328,6 @@ public class NettyClientTest { assertTrue(personObserver.error instanceof io.netty.handler.timeout.ReadTimeoutException); } - - @Test - public void testSameServerObservable() throws Exception { - IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ConnectTimeout, "1"); - final NettyHttpClient observableClient = new NettyHttpClient(config); - final HttpRequest request = HttpRequest.newBuilder().uri("http://www.google.com:81/").build(); - BaseLoadBalancer lb = new BaseLoadBalancer(new DummyPing(), new AvailabilityFilteringRule()); - LoadBalancerExecutor lbObservables = new LoadBalancerExecutor(lb, config); - lbObservables.setLoadBalancer(lb); - lbObservables.setMaxAutoRetries(2); - // lbObservables.setErrorHandler(new NettyHttpLoadBalancerErrorHandler()); - Observable observableWithRetries = lbObservables.retrySameServer(new Server(request.getUri().getAuthority()), new ClientObservableProvider() { - - @Override - public Observable getObservableForEndpoint(Server server) { - return observableClient.createEntityObservable(request, TypeDef.fromClass(HttpResponse.class)); - } - }, new NettyHttpLoadBalancerErrorHandler(2, 0, true)); - ObserverWithLatch observer = new ObserverWithLatch(); - observableWithRetries.subscribe(observer); - observer.await(); - ServerStats stats = lbObservables.getServerStats(new Server("www.google.com:81")); - assertEquals(3, stats.getTotalRequestsCount()); - assertEquals(0, stats.getActiveRequestsCount()); - assertEquals(3, stats.getSuccessiveConnectionFailureCount()); - } - - @Test - public void testSameServerObservableWithSuccess() throws Exception { - IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues(); - final NettyHttpClient observableClient = new NettyHttpClient(config); - final HttpRequest request = HttpRequest.newBuilder().uri("http://www.google.com:80/").build(); - BaseLoadBalancer lb = new BaseLoadBalancer(new DummyPing(), new AvailabilityFilteringRule()); - LoadBalancerExecutor lbObservables = new LoadBalancerExecutor(lb, config); - lbObservables.setLoadBalancer(lb); - lbObservables.setMaxAutoRetries(1); - // lbObservables.setErrorHandler(new NettyHttpLoadBalancerErrorHandler()); - Observable observableWithRetries = lbObservables.retrySameServer(new Server(request.getUri().getAuthority()), new ClientObservableProvider() { - - @Override - public Observable getObservableForEndpoint( - Server server) { - return observableClient.createEntityObservable(request, TypeDef.fromClass(HttpResponse.class)); - } - }, new NettyHttpLoadBalancerErrorHandler()); - ObserverWithLatch observer = new ObserverWithLatch(); - observableWithRetries.subscribe(observer); - observer.await(); - assertEquals(200, observer.obj.getStatus()); - ServerStats stats = lbObservables.getServerStats(new Server("www.google.com:80")); - assertEquals(1, stats.getTotalRequestsCount()); - assertEquals(0, stats.getActiveRequestsCount()); - assertEquals(0, stats.getSuccessiveConnectionFailureCount()); - } - @Test public void testObservableWithMultipleServers() throws Exception { IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ConnectTimeout, "1000");