提交 1cdb043f 编写于 作者: A Allen Wang

Add javadoc to LoadBalancerExecutor.

上级 3401c7be
......@@ -145,7 +145,7 @@ extends LoadBalancerExecutor implements IClient<S, T>, IClientConfigAware {
RequestSpecificRetryHandler<S> handler = getRequestSpecificRetryHandler(request, requestConfig);
try {
return RxUtils.getSingleValueWithRealErrorCause(retryWithLoadBalancer(request.getUri(), toObsevableProvider(callableProvider), handler, request.getLoadBalancerKey()));
return RxUtils.getSingleValueWithRealErrorCause(retryWithLoadBalancer(toObsevableProvider(callableProvider), request.getUri(), handler, request.getLoadBalancerKey()));
} catch (Exception e) {
if (e instanceof ClientException) {
throw (ClientException) e;
......
package com.netflix.client;
import java.util.concurrent.Callable;
import com.netflix.loadbalancer.Server;
......
......@@ -439,9 +439,7 @@ public class LoadBalancerContext implements IClientConfigAware {
* <li> if host is missing but none of the above applies, throws ClientException
*
* @param original Original URI passed from caller
* @return new request with the final URI
*/
@SuppressWarnings("unchecked")
protected Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
String host = null;
int port = -1;
......
......@@ -144,15 +144,42 @@ public class LoadBalancerExecutor extends LoadBalancerContext {
}
public <T> T retryWithLoadBalancer(final URI loadBalancerURI, final ClientCallableProvider<T> clientCallableProvider,
@Nullable final RetryHandler errorHandler, @Nullable final Object loadBalancerKey) throws Exception {
/**
* Retry execution with load balancer with the given {@link ClientCallableProvider} that provides the logic to
* execute network call synchronously with a given {@link Server}.
*
* @param clientCallableProvider interface that provides the logic to execute network call synchronously with a given {@link Server}
* @param loadBalancerURI An optional URI that may contain a real host name and port to use as a fallback to the {@link LoadBalancerExecutor}
* if it does not have a load balancer or cannot find a server from its server list. For example, the URI contains
* "www.google.com:80" will force the {@link LoadBalancerExecutor} to use www.google.com:80 as the actual server to
* carry out the retry execution. See {@link LoadBalancerContext#getServerFromLoadBalancer(URI, Object)}
* @param retryHandler an optional handler to determine the retry logic of the {@link LoadBalancerExecutor}. If null, the default {@link RetryHandler}
* of this {@link LoadBalancerExecutor} will be used.
* @param loadBalancerKey An optional key passed to the load balancer to determine which server to return.
* @throws Exception If any exception happens in the exception
*/
public <T> T retryWithLoadBalancer(final ClientCallableProvider<T> clientCallableProvider, @Nullable final URI loadBalancerURI,
@Nullable final RetryHandler retryHandler, @Nullable final Object loadBalancerKey) throws Exception {
return RxUtils.getSingleValueWithRealErrorCause(
retryWithLoadBalancer(loadBalancerURI, CallableToObservable.toObsevableProvider(clientCallableProvider),
errorHandler, loadBalancerKey));
retryWithLoadBalancer(CallableToObservable.toObsevableProvider(clientCallableProvider), loadBalancerURI,
retryHandler, loadBalancerKey));
}
public <T> Observable<T> retryWithLoadBalancer(final URI loadBalancerURI, final ClientObservableProvider<T> clientObservableProvider,
@Nullable final RetryHandler errorHandler, @Nullable final Object loadBalancerKey) {
/**
* Create an {@link Observable} that retries execution with load balancer with the given {@link ClientObservableProvider} that provides the logic to
* execute network call asynchronously with a given {@link Server}.
*
* @param clientObservableProvider interface that provides the logic to execute network call asynchronously with a given {@link Server}
* @param loadBalancerURI An optional URI that may contain a real host name and port to be used by {@link LoadBalancerExecutor}
* if it does not have a load balancer or cannot find a server from its server list. For example, the URI contains
* "www.google.com:80" will force the {@link LoadBalancerExecutor} to use www.google.com:80 as the actual server to
* carry out the retry execution. See {@link LoadBalancerContext#getServerFromLoadBalancer(URI, Object)}
* @param retryHandler an optional handler to determine the retry logic of the {@link LoadBalancerExecutor}. If null, the default {@link RetryHandler}
* of this {@link LoadBalancerExecutor} will be used.
* @param loadBalancerKey An optional key passed to the load balancer to determine which server to return.
*/
public <T> Observable<T> retryWithLoadBalancer(final ClientObservableProvider<T> clientObservableProvider, @Nullable final URI loadBalancerURI,
@Nullable final RetryHandler retryHandler, @Nullable final Object loadBalancerKey) {
OnSubscribeFunc<T> onSubscribe = new OnSubscribeFunc<T>() {
@Override
public Subscription onSubscribe(final Observer<? super T> t1) {
......@@ -164,15 +191,15 @@ public class LoadBalancerExecutor extends LoadBalancerContext {
t1.onError(e);
return Subscriptions.empty();
}
return retrySameServer(server, clientObservableProvider, errorHandler).subscribe(t1);
return retrySameServer(server, clientObservableProvider, retryHandler).subscribe(t1);
}
};
Observable<T> observable = Observable.create(onSubscribe);
RetryNextServerFunc<T> retryNextServerFunc = new RetryNextServerFunc<T>(loadBalancerURI, onSubscribe, errorHandler);
RetryNextServerFunc<T> retryNextServerFunc = new RetryNextServerFunc<T>(loadBalancerURI, onSubscribe, retryHandler);
return observable.onErrorResumeNext(retryNextServerFunc);
}
public <T> Observable<T> retrySameServer(final Server server, final ClientObservableProvider<T> clientObservableProvider, final RetryHandler errorHandler) {
protected <T> Observable<T> retrySameServer(final Server server, final ClientObservableProvider<T> clientObservableProvider, final RetryHandler errorHandler) {
final ServerStats serverStats = getServerStats(server);
OnSubscribeFunc<T> onSubscribe = new OnSubscribeFunc<T>() {
@Override
......
......@@ -10,7 +10,7 @@ public class RxUtils {
@Override
public Observable<T> call(Throwable t1) {
if (t1 instanceof RuntimeException) {
if ((t1 instanceof RuntimeException) && t1.getCause() != null) {
return Observable.error(t1.getCause());
} else {
return Observable.error(t1);
......
......@@ -89,33 +89,33 @@ public class NettyHttpLoadBalancingClient extends NettyHttpClient {
@Override
public <T> Observable<ServerSentEvent<T>> createServerSentEventEntityObservable(
final HttpRequest request, final TypeDef<T> typeDef, final IClientConfig requestConfig) {
return lbObservables.retryWithLoadBalancer(request.getUri(), new ClientObservableProvider<ServerSentEvent<T>>() {
return lbObservables.retryWithLoadBalancer(new ClientObservableProvider<ServerSentEvent<T>>() {
@Override
public Observable<ServerSentEvent<T>> getObservableForEndpoint(Server server) {
return delegate.createServerSentEventEntityObservable(createRequest(server, request), typeDef, requestConfig);
}
}, new HttpRequestRetryHandler(request, requestConfig, lbObservables.getErrorHandler()), request.getLoadBalancerKey());
}, request.getUri(), new HttpRequestRetryHandler(request, requestConfig, lbObservables.getErrorHandler()), request.getLoadBalancerKey());
}
@Override
public Observable<ObservableHttpResponse<SSEEvent>> createServerSentEventObservable(
final HttpRequest request, final IClientConfig requestConfig) {
return lbObservables.retryWithLoadBalancer(request.getUri(), new ClientObservableProvider<ObservableHttpResponse<SSEEvent>>() {
return lbObservables.retryWithLoadBalancer(new ClientObservableProvider<ObservableHttpResponse<SSEEvent>>() {
@Override
public Observable<ObservableHttpResponse<SSEEvent>> getObservableForEndpoint(Server server) {
return delegate.createServerSentEventObservable(createRequest(server, request), requestConfig);
}
}, new HttpRequestRetryHandler(request, requestConfig, lbObservables.getErrorHandler()), request.getLoadBalancerKey());
}, request.getUri(), new HttpRequestRetryHandler(request, requestConfig, lbObservables.getErrorHandler()), request.getLoadBalancerKey());
}
@Override
public Observable<HttpResponse> createFullHttpResponseObservable(
final HttpRequest request, final IClientConfig requestConfig) {
return lbObservables.retryWithLoadBalancer(request.getUri(), new ClientObservableProvider<HttpResponse>() {
return lbObservables.retryWithLoadBalancer(new ClientObservableProvider<HttpResponse>() {
@Override
public Observable<HttpResponse> getObservableForEndpoint(
......@@ -123,20 +123,20 @@ public class NettyHttpLoadBalancingClient extends NettyHttpClient {
return delegate.createFullHttpResponseObservable(createRequest(server, request), requestConfig);
}
}, new HttpRequestRetryHandler(request, requestConfig, lbObservables.getErrorHandler()), request.getLoadBalancerKey());
}, request.getUri(), new HttpRequestRetryHandler(request, requestConfig, lbObservables.getErrorHandler()), request.getLoadBalancerKey());
}
public <T> Observable<T> createEntityObservable(final HttpRequest request,
final TypeDef<T> typeDef, final IClientConfig requestConfig, @Nullable final RetryHandler retryHandler) {
final RetryHandler handler = retryHandler == null ?
new HttpRequestRetryHandler(request, requestConfig, lbObservables.getErrorHandler()) : retryHandler;
return lbObservables.retryWithLoadBalancer(request.getUri(), new ClientObservableProvider<T>() {
return lbObservables.retryWithLoadBalancer(new ClientObservableProvider<T>() {
@Override
public Observable<T> getObservableForEndpoint(Server server) {
return delegate.createEntityObservable(createRequest(server, request), typeDef, requestConfig);
}
}, handler, request.getLoadBalancerKey());
}, request.getUri(), handler, request.getLoadBalancerKey());
}
......
......@@ -328,61 +328,6 @@ public class NettyClientTest {
assertTrue(personObserver.error instanceof io.netty.handler.timeout.ReadTimeoutException);
}
@Test
public void testSameServerObservable() throws Exception {
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ConnectTimeout, "1");
final NettyHttpClient observableClient = new NettyHttpClient(config);
final HttpRequest request = HttpRequest.newBuilder().uri("http://www.google.com:81/").build();
BaseLoadBalancer lb = new BaseLoadBalancer(new DummyPing(), new AvailabilityFilteringRule());
LoadBalancerExecutor lbObservables = new LoadBalancerExecutor(lb, config);
lbObservables.setLoadBalancer(lb);
lbObservables.setMaxAutoRetries(2);
// lbObservables.setErrorHandler(new NettyHttpLoadBalancerErrorHandler());
Observable<HttpResponse> observableWithRetries = lbObservables.retrySameServer(new Server(request.getUri().getAuthority()), new ClientObservableProvider<HttpResponse>() {
@Override
public Observable<HttpResponse> getObservableForEndpoint(Server server) {
return observableClient.createEntityObservable(request, TypeDef.fromClass(HttpResponse.class));
}
}, new NettyHttpLoadBalancerErrorHandler(2, 0, true));
ObserverWithLatch<HttpResponse> observer = new ObserverWithLatch<HttpResponse>();
observableWithRetries.subscribe(observer);
observer.await();
ServerStats stats = lbObservables.getServerStats(new Server("www.google.com:81"));
assertEquals(3, stats.getTotalRequestsCount());
assertEquals(0, stats.getActiveRequestsCount());
assertEquals(3, stats.getSuccessiveConnectionFailureCount());
}
@Test
public void testSameServerObservableWithSuccess() throws Exception {
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
final NettyHttpClient observableClient = new NettyHttpClient(config);
final HttpRequest request = HttpRequest.newBuilder().uri("http://www.google.com:80/").build();
BaseLoadBalancer lb = new BaseLoadBalancer(new DummyPing(), new AvailabilityFilteringRule());
LoadBalancerExecutor lbObservables = new LoadBalancerExecutor(lb, config);
lbObservables.setLoadBalancer(lb);
lbObservables.setMaxAutoRetries(1);
// lbObservables.setErrorHandler(new NettyHttpLoadBalancerErrorHandler());
Observable<HttpResponse> observableWithRetries = lbObservables.retrySameServer(new Server(request.getUri().getAuthority()), new ClientObservableProvider<HttpResponse>() {
@Override
public Observable<HttpResponse> getObservableForEndpoint(
Server server) {
return observableClient.createEntityObservable(request, TypeDef.fromClass(HttpResponse.class));
}
}, new NettyHttpLoadBalancerErrorHandler());
ObserverWithLatch<HttpResponse> observer = new ObserverWithLatch<HttpResponse>();
observableWithRetries.subscribe(observer);
observer.await();
assertEquals(200, observer.obj.getStatus());
ServerStats stats = lbObservables.getServerStats(new Server("www.google.com:80"));
assertEquals(1, stats.getTotalRequestsCount());
assertEquals(0, stats.getActiveRequestsCount());
assertEquals(0, stats.getSuccessiveConnectionFailureCount());
}
@Test
public void testObservableWithMultipleServers() throws Exception {
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ConnectTimeout, "1000");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册