提交 85dc4398 编写于 作者: A allenxwang

Merge pull request #56 from allenxwang/cp

Fix RestClient connection leaks when it throws ClientException upon server throttle. Enhanced JUnit tests for RestClient retries.
......@@ -163,8 +163,8 @@ public abstract class AbstractLoadBalancerAwareClient<S extends ClientRequest, T
T response = null;
do {
S resolved = computeFinalUriWithLoadBalancer(request);
try {
S resolved = computeFinalUriWithLoadBalancer(request);
response = executeOnSingleServer(resolved);
done = true;
} catch (Exception e) {
......@@ -178,7 +178,7 @@ public abstract class AbstractLoadBalancerAwareClient<S extends ClientRequest, T
if (retries > numRetriesNextServer) {
throw new ClientException(
ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"NUMBER_OF_RETRIES_NEXTSERVER_EXCEEDED :"
"NUMBER_OF_RETRIES_NEXTSERVER_EXCEEDED: "
+ numRetriesNextServer
+ " retries, while making a RestClient call for:"
+ request.getUri() + ":" + getDeepestCause(e).getMessage(), e);
......@@ -186,7 +186,7 @@ public abstract class AbstractLoadBalancerAwareClient<S extends ClientRequest, T
logger.error("Exception while executing request which is deemed retry-able, retrying ..., Next Server Retry Attempt#:"
+ retries
+ ", URI tried:"
+ request.getUri());
+ resolved.getUri());
} else {
if (e instanceof ClientException) {
throw (ClientException) e;
......
......@@ -585,4 +585,12 @@ public abstract class LoadBalancerContext<T extends ClientRequest, S extends IRe
LoadBalancerErrorHandler<? super T, ? super S> errorHandler) {
this.errorHandler = errorHandler;
}
public final boolean isOkToRetryOnAllOperations() {
return okToRetryOnAllOperations;
}
public final void setOkToRetryOnAllOperations(boolean okToRetryOnAllOperations) {
this.okToRetryOnAllOperations = okToRetryOnAllOperations;
}
}
......@@ -31,6 +31,8 @@ import org.apache.http.impl.conn.tsccm.WaitingThreadAborter;
import org.apache.http.params.HttpParams;
import com.google.common.base.Preconditions;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.monitor.Stopwatch;
......@@ -167,4 +169,10 @@ public class NamedConnectionPool extends ConnPoolByRoute {
public final long getDeleteCount() {
return deleteCounter.getValue().longValue();
}
@Monitor(name="connectionCount", type=DataSourceType.GAUGE)
public int getConnectionCount() {
return this.getConnectionsInPool();
}
}
......@@ -19,6 +19,7 @@ package com.netflix.niws.client.http;
import java.io.File;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
......@@ -604,6 +605,7 @@ public class RestClient extends AbstractLoadBalancerAwareClient<HttpRequest, Htt
thisResponse = new HttpClientResponse(jerseyResponse);
thisResponse.setRequestedURI(uri);
if (thisResponse.getStatus() == 503){
thisResponse.close();
throw new ClientException(ClientException.ErrorType.SERVER_THROTTLED);
}
return thisResponse;
......@@ -611,23 +613,29 @@ public class RestClient extends AbstractLoadBalancerAwareClient<HttpRequest, Htt
@Override
protected boolean isRetriableException(Throwable e) {
boolean shouldRetry = isConnectException(e) || isSocketException(e);
if (e instanceof ClientException
&& ((ClientException)e).getErrorType() == ClientException.ErrorType.SERVER_THROTTLED){
shouldRetry = true;
return false;
}
boolean shouldRetry = isConnectException(e) || isSocketException(e);
return shouldRetry;
}
@Override
protected boolean isCircuitBreakerException(Throwable e) {
if (e instanceof ClientException) {
ClientException clientException = (ClientException) e;
if (clientException.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) {
return true;
}
}
return isConnectException(e) || isSocketException(e);
}
private static boolean isSocketException(Throwable e) {
int levelCount = 0;
while (e != null && levelCount < 10) {
if (e instanceof SocketException) {
if ((e instanceof SocketException) || (e instanceof SocketTimeoutException)) {
return true;
}
e = e.getCause();
......
/*
*
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.netflix.niws.client.http;
import static org.junit.Assert.*;
import java.net.URI;
import java.util.Random;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.Lists;
import com.netflix.client.ClientException;
import com.netflix.client.ClientFactory;
import com.netflix.client.http.HttpRequest;
import com.netflix.client.http.HttpRequest.Verb;
import com.netflix.client.http.HttpResponse;
import com.netflix.config.ConfigurationManager;
import com.netflix.http4.MonitoredConnectionManager;
import com.netflix.http4.NFHttpClient;
import com.netflix.http4.NFHttpClientFactory;
import com.netflix.loadbalancer.BaseLoadBalancer;
import com.netflix.loadbalancer.DummyPing;
import com.netflix.loadbalancer.Server;
import com.netflix.loadbalancer.ServerStats;
import com.sun.jersey.api.container.httpserver.HttpServerFactory;
import com.sun.jersey.api.core.PackagesResourceConfig;
import com.sun.net.httpserver.HttpServer;
public class RetryTest {
private static HttpServer server = null;
private static String SERVICE_URI;
private static RestClient client;
private static BaseLoadBalancer lb;
private static int port = (new Random()).nextInt(1000) + 4000;
private static Server localServer = new Server("localhost", port);
private static NFHttpClient httpClient;
private static MonitoredConnectionManager connectionPoolManager;
@BeforeClass
public static void init() throws Exception {
PackagesResourceConfig resourceConfig = new PackagesResourceConfig("com.netflix.niws.client.http");
SERVICE_URI = "http://localhost:" + port + "/";
try{
server = HttpServerFactory.create(SERVICE_URI, resourceConfig);
server.start();
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
ConfigurationManager.getConfigInstance().setProperty("RetryTest.ribbon.NFLoadBalancerClassName", BaseLoadBalancer.class.getName());
ConfigurationManager.getConfigInstance().setProperty("RetryTest.ribbon.client.NFLoadBalancerPingClassName", DummyPing.class.getName());
ConfigurationManager.getConfigInstance().setProperty("RetryTest.ribbon.ReadTimeout", "5000");
client = (RestClient) ClientFactory.getNamedClient("RetryTest");
lb = (BaseLoadBalancer) client.getLoadBalancer();
lb.setServersList(Lists.newArrayList(localServer));
httpClient = NFHttpClientFactory.getNamedNFHttpClient("RetryTest");
connectionPoolManager = (MonitoredConnectionManager) httpClient.getConnectionManager();
}
@AfterClass
public static void shutDown() {
server.stop(0);
}
@Before
public void setUp() {
client.setMaxAutoRetries(0);
client.setMaxAutoRetriesNextServer(0);
client.setOkToRetryOnAllOperations(false);
lb.setServersList(Lists.newArrayList(localServer));
lb.getLoadBalancerStats().getSingleServerStat(localServer).clearSuccessiveConnectionFailureCount();
}
@Test
public void testThrottled() throws Exception {
URI localUrl = new URI("/test/get503");
HttpRequest request = HttpRequest.newBuilder().uri(localUrl).build();
try {
client.executeWithLoadBalancer(request);
fail("Exception expected");
} catch (ClientException e) {
assertNotNull(e);
}
assertEquals(1, lb.getLoadBalancerStats().getSingleServerStat(localServer).getSuccessiveConnectionFailureCount());
}
@Test
public void testThrottledWithRetrySameServer() throws Exception {
client.setMaxAutoRetries(2);
URI localUrl = new URI("/test/get503");
HttpRequest request = HttpRequest.newBuilder().uri(localUrl).build();
try {
client.executeWithLoadBalancer(request);
fail("Exception expected");
} catch (ClientException e) { // NOPMD
}
assertEquals(1, lb.getLoadBalancerStats().getSingleServerStat(localServer).getSuccessiveConnectionFailureCount());
}
@Test
public void testThrottledWithRetryNextServer() throws Exception {
int connectionCount = connectionPoolManager.getConnectionsInPool();
client.setMaxAutoRetriesNextServer(2);
URI localUrl = new URI("/test/get503");
HttpRequest request = HttpRequest.newBuilder().uri(localUrl).build();
try {
client.executeWithLoadBalancer(request);
fail("Exception expected");
} catch (ClientException e) { // NOPMD
}
assertEquals(3, lb.getLoadBalancerStats().getSingleServerStat(localServer).getSuccessiveConnectionFailureCount());
System.out.println("Initial connections count " + connectionCount);
System.out.println("Final connections count " + connectionPoolManager.getConnectionsInPool());
// should be no connection leak
assertTrue(connectionPoolManager.getConnectionsInPool() <= connectionCount + 1);
}
@Test
public void testReadTimeout() throws Exception {
client.setMaxAutoRetriesNextServer(2);
URI localUrl = new URI("/test/getReadtimeout");
HttpRequest request = HttpRequest.newBuilder().uri(localUrl).build();
try {
client.executeWithLoadBalancer(request);
fail("Exception expected");
} catch (ClientException e) { // NOPMD
}
assertEquals(3, lb.getLoadBalancerStats().getSingleServerStat(localServer).getSuccessiveConnectionFailureCount());
}
@Test
public void testReadTimeoutWithRetriesNextServe() throws Exception {
client.setMaxAutoRetriesNextServer(2);
URI localUrl = new URI("/test/getReadtimeout");
HttpRequest request = HttpRequest.newBuilder().uri(localUrl).build();
try {
client.executeWithLoadBalancer(request);
fail("Exception expected");
} catch (ClientException e) { // NOPMD
}
assertEquals(3, lb.getLoadBalancerStats().getSingleServerStat(localServer).getSuccessiveConnectionFailureCount());
}
@Test
public void postReadTimeout() throws Exception {
client.setMaxAutoRetriesNextServer(2);
URI localUrl = new URI("/test/postReadtimeout");
HttpRequest request = HttpRequest.newBuilder().uri(localUrl).verb(Verb.POST).build();
try {
client.executeWithLoadBalancer(request);
fail("Exception expected");
} catch (ClientException e) { // NOPMD
}
ServerStats stats = lb.getLoadBalancerStats().getSingleServerStat(localServer);
assertEquals(1, stats.getSuccessiveConnectionFailureCount());
}
@Test
public void testAggressiveRetries() throws Exception {
client.setMaxAutoRetriesNextServer(2);
client.setOkToRetryOnAllOperations(true);
URI localUrl = new URI("/test/postReadtimeout");
HttpRequest request = HttpRequest.newBuilder().uri(localUrl).verb(Verb.POST).build();
try {
client.executeWithLoadBalancer(request);
fail("Exception expected");
} catch (ClientException e) { // NOPMD
}
ServerStats stats = lb.getLoadBalancerStats().getSingleServerStat(localServer);
assertEquals(3, stats.getSuccessiveConnectionFailureCount());
}
@Test
public void testSuccessfulRetries() throws Exception {
lb.setServersList(Lists.newArrayList(new Server("localhost:12987"), new Server("localhost:12987"), localServer));
client.setMaxAutoRetriesNextServer(2);
URI localUrl = new URI("/test/getObject");
HttpRequest request = HttpRequest.newBuilder().uri(localUrl).queryParams("name", "ribbon").build();
try {
HttpResponse response = client.executeWithLoadBalancer(request);
assertEquals(200, response.getStatus());
} catch (ClientException e) {
fail("Unexpected exception");
}
ServerStats stats = lb.getLoadBalancerStats().getSingleServerStat(new Server("localhost:12987"));
assertEquals(1, stats.getSuccessiveConnectionFailureCount());
}
}
......@@ -62,4 +62,38 @@ public class TestResource {
return Response.serverError().build();
}
}
@GET
@Path("/get503")
public Response get503() {
return Response.status(503).build();
}
@GET
@Path("/get500")
public Response get500() {
return Response.status(500).build();
}
@GET
@Path("/getReadtimeout")
public Response getReadtimeout() {
try {
Thread.sleep(10000);
} catch (Exception e) { // NOPMD
}
return Response.ok().build();
}
@POST
@Path("/postReadtimeout")
public Response postReadtimeout() {
try {
Thread.sleep(10000);
} catch (Exception e) { // NOPMD
}
return Response.ok().build();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册