From 6177e44d9177d1945770bf5eb2cda4b77b99b007 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Mon, 14 Apr 2014 18:47:11 -0700 Subject: [PATCH] Fix Issue #28 --- .../com/netflix/client/PrimeConnections.java | 3 +- .../loadbalancer/BaseLoadBalancer.java | 9 ++++ .../DynamicServerListLoadBalancer.java | 10 ++++ ...uledThreadPoolExectuorWithDynamicSize.java | 39 ++++++++++++++ .../netflix/http4/ConnectionPoolCleaner.java | 51 ++++++++++--------- .../http4/MonitoredConnectionManager.java | 2 - .../java/com/netflix/http4/NFHttpClient.java | 28 ++++++++-- .../netflix/http4/NFHttpClientFactory.java | 7 ++- .../netflix/http4/NamedConnectionPool.java | 7 +++ .../netflix/niws/client/http/RestClient.java | 17 ++++--- 10 files changed, 132 insertions(+), 41 deletions(-) create mode 100644 ribbon-core/src/main/java/com/netflix/utils/ScheduledThreadPoolExectuorWithDynamicSize.java diff --git a/ribbon-core/src/main/java/com/netflix/client/PrimeConnections.java b/ribbon-core/src/main/java/com/netflix/client/PrimeConnections.java index 87b672e..a52cb31 100644 --- a/ribbon-core/src/main/java/com/netflix/client/PrimeConnections.java +++ b/ribbon-core/src/main/java/com/netflix/client/PrimeConnections.java @@ -310,8 +310,9 @@ public class PrimeConnections { return executorService.submit(ftConn); } - void shutdown() { + public void shutdown() { executorService.shutdown(); + Monitors.unregisterObject(name + "_PrimeConnection", this); } private Boolean connectToServer(final Server s, final PrimeConnectionListener listener) { diff --git a/ribbon-core/src/main/java/com/netflix/loadbalancer/BaseLoadBalancer.java b/ribbon-core/src/main/java/com/netflix/loadbalancer/BaseLoadBalancer.java index ebc8fcc..0e97fc5 100644 --- a/ribbon-core/src/main/java/com/netflix/loadbalancer/BaseLoadBalancer.java +++ b/ribbon-core/src/main/java/com/netflix/loadbalancer/BaseLoadBalancer.java @@ -855,4 +855,13 @@ public class BaseLoadBalancer extends AbstractLoadBalancer implements boolean enablePrimingConnections) { this.enablePrimingConnections = enablePrimingConnections; } + + public void shutdown() { + cancelPingTask(); + if (primeConnections != null) { + primeConnections.shutdown(); + } + Monitors.unregisterObject("LoadBalancer_" + name, this); + Monitors.unregisterObject("Rule_" + name, this.getRule()); + } } diff --git a/ribbon-core/src/main/java/com/netflix/loadbalancer/DynamicServerListLoadBalancer.java b/ribbon-core/src/main/java/com/netflix/loadbalancer/DynamicServerListLoadBalancer.java index 0937937..b6e472a 100644 --- a/ribbon-core/src/main/java/com/netflix/loadbalancer/DynamicServerListLoadBalancer.java +++ b/ribbon-core/src/main/java/com/netflix/loadbalancer/DynamicServerListLoadBalancer.java @@ -278,6 +278,9 @@ public class DynamicServerListLoadBalancer extends public void run() { if (!serverRefreshEnabled) { + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } return; } try { @@ -353,10 +356,17 @@ public class DynamicServerListLoadBalancer extends } } + @Override public String toString() { StringBuilder sb = new StringBuilder("DynamicServerListLoadBalancer:"); sb.append(super.toString()); sb.append("ServerList:" + String.valueOf(serverListImpl)); return sb.toString(); } + + @Override + public void shutdown() { + super.shutdown(); + stopServerListRefreshing(); + } } diff --git a/ribbon-core/src/main/java/com/netflix/utils/ScheduledThreadPoolExectuorWithDynamicSize.java b/ribbon-core/src/main/java/com/netflix/utils/ScheduledThreadPoolExectuorWithDynamicSize.java new file mode 100644 index 0000000..1b973ef --- /dev/null +++ b/ribbon-core/src/main/java/com/netflix/utils/ScheduledThreadPoolExectuorWithDynamicSize.java @@ -0,0 +1,39 @@ +package com.netflix.utils; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; + +import com.netflix.config.DynamicIntProperty; + +/** + * A {@link ScheduledThreadPoolExecutor} whose core size can be dynamically changed by a given {@link DynamicIntProperty} and + * registers itself with a shutdown hook to shut down. + * + * @author awang + * + */ +public class ScheduledThreadPoolExectuorWithDynamicSize extends ScheduledThreadPoolExecutor { + + private final Thread shutdownThread; + + public ScheduledThreadPoolExectuorWithDynamicSize(final DynamicIntProperty corePoolSize, ThreadFactory threadFactory) { + super(corePoolSize.get(), threadFactory); + corePoolSize.addCallback(new Runnable() { + public void run() { + setCorePoolSize(corePoolSize.get()); + } + }); + shutdownThread = new Thread(new Runnable() { + public void run() { + shutdown(); + if (shutdownThread != null) { + try { + Runtime.getRuntime().removeShutdownHook(shutdownThread); + } catch (IllegalStateException ise) { // NOPMD + } + } + } + }); + Runtime.getRuntime().addShutdownHook(shutdownThread); + } +} diff --git a/ribbon-httpclient/src/main/java/com/netflix/http4/ConnectionPoolCleaner.java b/ribbon-httpclient/src/main/java/com/netflix/http4/ConnectionPoolCleaner.java index e1f5deb..42546df 100644 --- a/ribbon-httpclient/src/main/java/com/netflix/http4/ConnectionPoolCleaner.java +++ b/ribbon-httpclient/src/main/java/com/netflix/http4/ConnectionPoolCleaner.java @@ -17,8 +17,8 @@ */ package com.netflix.http4; -import java.util.Timer; -import java.util.TimerTask; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.http.conn.ClientConnectionManager; @@ -40,19 +40,18 @@ public class ConnectionPoolCleaner { String name = "default"; ClientConnectionManager connMgr; - Timer timer; - - + ScheduledExecutorService scheduler; private DynamicIntProperty connIdleEvictTimeMilliSeconds = DynamicPropertyFactory.getInstance().getIntProperty("default.nfhttpclient.connIdleEvictTimeMilliSeconds", NFHttpClientConstants.DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS); - boolean enableConnectionPoolCleanerTask = false; + volatile boolean enableConnectionPoolCleanerTask = false; long connectionCleanerTimerDelay = 10; long connectionCleanerRepeatInterval = NFHttpClientConstants.DEFAULT_CONNECTION_IDLE_TIMERTASK_REPEAT_IN_MSECS; + private volatile ScheduledFuture scheduledFuture; - public ConnectionPoolCleaner(String name, ClientConnectionManager connMgr){ + public ConnectionPoolCleaner(String name, ClientConnectionManager connMgr, ScheduledExecutorService scheduler){ this.name = name; this.connMgr = connMgr; } @@ -94,28 +93,22 @@ public class ConnectionPoolCleaner { } public void initTask(){ - if (enableConnectionPoolCleanerTask){ - timer = new Timer(name + "-ConnectionPoolCleanerThread", true); - timer.schedule(new TimerTask() { - - public void run() { - try { + if (enableConnectionPoolCleanerTask) { + scheduledFuture = scheduler.scheduleWithFixedDelay(new Runnable() { + public void run() { + try { + if (enableConnectionPoolCleanerTask) { + logger.debug("Connection pool clean up started for client {}", name); cleanupConnections(); - } catch (Throwable e) { - logger.error("Exception in ConnectionPoolCleanerThread",e); - //e.printStackTrace(); + } else if (scheduledFuture != null) { + scheduledFuture.cancel(true); } + } catch (Throwable e) { + logger.error("Exception in ConnectionPoolCleanerThread",e); } - }, connectionCleanerTimerDelay, connectionCleanerRepeatInterval); - logger.info("Initializing ConnectionPoolCleaner for NFHttpClient:" + name); - // Add it to the shutdown hook - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - public void run() - { - logger.info("Stopping the ConnectionPoolCleaner Update Task"); - timer.cancel(); } - })); + }, connectionCleanerTimerDelay, connectionCleanerRepeatInterval, TimeUnit.MILLISECONDS); + logger.info("Initializing ConnectionPoolCleaner for NFHttpClient:" + name); } } @@ -124,6 +117,13 @@ public class ConnectionPoolCleaner { connMgr.closeIdleConnections(connIdleEvictTimeMilliSeconds.get(), TimeUnit.MILLISECONDS); } + public void shutdown() { + enableConnectionPoolCleanerTask = false; + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + } + public String toString(){ StringBuilder sb = new StringBuilder(); @@ -136,4 +136,5 @@ public class ConnectionPoolCleaner { } + } diff --git a/ribbon-httpclient/src/main/java/com/netflix/http4/MonitoredConnectionManager.java b/ribbon-httpclient/src/main/java/com/netflix/http4/MonitoredConnectionManager.java index a381aa4..0efb188 100644 --- a/ribbon-httpclient/src/main/java/com/netflix/http4/MonitoredConnectionManager.java +++ b/ribbon-httpclient/src/main/java/com/netflix/http4/MonitoredConnectionManager.java @@ -83,6 +83,4 @@ public class MonitoredConnectionManager extends ThreadSafeClientConnManager { // TODO Auto-generated method stub return super.requestConnection(route, state); } - - } diff --git a/ribbon-httpclient/src/main/java/com/netflix/http4/NFHttpClient.java b/ribbon-httpclient/src/main/java/com/netflix/http4/NFHttpClient.java index b6ec894..03a6e08 100644 --- a/ribbon-httpclient/src/main/java/com/netflix/http4/NFHttpClient.java +++ b/ribbon-httpclient/src/main/java/com/netflix/http4/NFHttpClient.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -44,6 +46,7 @@ import org.apache.http.protocol.HttpContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.netflix.client.config.CommonClientConfigKey; import com.netflix.client.config.DefaultClientConfigImpl; import com.netflix.client.config.IClientConfig; @@ -54,6 +57,7 @@ import com.netflix.servo.annotations.Monitor; import com.netflix.servo.monitor.Monitors; import com.netflix.servo.monitor.Stopwatch; import com.netflix.servo.monitor.Timer; +import com.netflix.utils.ScheduledThreadPoolExectuorWithDynamicSize; /** * Netflix extension of Apache 4.0 HttpClient @@ -67,7 +71,11 @@ public class NFHttpClient extends DefaultHttpClient { private static final Logger LOGGER = LoggerFactory.getLogger(NFHttpClient.class); protected static final String EXECUTE_TRACER = "HttpClient-ExecuteTimer"; - + + private static final DynamicIntProperty CORE_SIZE = new DynamicIntProperty("NFHttpClient.connectionPoolCleanerNumberCoreThreads", 2); + + private static ScheduledExecutorService connectionPoolCleanUpScheduler; + private HttpHost httpHost = null; private HttpRoute httpRoute = null; @@ -87,6 +95,13 @@ public class NFHttpClient extends DefaultHttpClient { private DynamicIntProperty maxTotalConnectionProperty; private DynamicIntProperty maxConnectionPerHostProperty; + static { + ThreadFactory factory = (new ThreadFactoryBuilder()).setDaemon(true) + .setNameFormat("Connection pool clean up thread") + .build(); + connectionPoolCleanUpScheduler = new ScheduledThreadPoolExectuorWithDynamicSize(CORE_SIZE, factory); + } + protected NFHttpClient(String host, int port){ super(new ThreadSafeClientConnManager()); this.name = "UNNAMED_" + numNonNamedHttpClients.incrementAndGet(); @@ -128,7 +143,7 @@ public class NFHttpClient extends DefaultHttpClient { defaultHeaders.add(new BasicHeader("X-netflix-httpclientname", name)); params.setParameter(ClientPNames.DEFAULT_HEADERS, defaultHeaders); - connPoolCleaner = new ConnectionPoolCleaner(name, this.getConnectionManager()); + connPoolCleaner = new ConnectionPoolCleaner(name, this.getConnectionManager(), connectionPoolCleanUpScheduler); this.retriesProperty = DynamicPropertyFactory.getInstance().getIntProperty(this.name + ".nfhttpclient" + ".retries", 3); this.sleepTimeFactorMsProperty = DynamicPropertyFactory.getInstance().getIntProperty(this.name + ".nfhttpclient"+ ".sleepTimeFactorMs", 10); @@ -292,4 +307,11 @@ public class NFHttpClient extends DefaultHttpClient { sw.stop(); } } -} \ No newline at end of file + + public void shutdown() { + if (connPoolCleaner != null) { + connPoolCleaner.shutdown(); + } + getConnectionManager().shutdown(); + } +} diff --git a/ribbon-httpclient/src/main/java/com/netflix/http4/NFHttpClientFactory.java b/ribbon-httpclient/src/main/java/com/netflix/http4/NFHttpClientFactory.java index b7539b2..286b883 100644 --- a/ribbon-httpclient/src/main/java/com/netflix/http4/NFHttpClientFactory.java +++ b/ribbon-httpclient/src/main/java/com/netflix/http4/NFHttpClientFactory.java @@ -21,7 +21,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.collections.keyvalue.MultiKey; -import org.apache.http.client.HttpClient; import com.netflix.client.config.DefaultClientConfigImpl; import com.netflix.client.config.IClientConfig; @@ -89,9 +88,9 @@ public class NFHttpClientFactory { } public static void shutdownNFHttpClient(String name) { - HttpClient c = namedClientMap.get(name); - if(c != null) { - c.getConnectionManager().shutdown(); + NFHttpClient c = namedClientMap.get(name); + if (c != null) { + c.shutdown(); namedClientMap.remove(name); Monitors.unregisterObject(name, c); } diff --git a/ribbon-httpclient/src/main/java/com/netflix/http4/NamedConnectionPool.java b/ribbon-httpclient/src/main/java/com/netflix/http4/NamedConnectionPool.java index b2c45f2..da62a74 100644 --- a/ribbon-httpclient/src/main/java/com/netflix/http4/NamedConnectionPool.java +++ b/ribbon-httpclient/src/main/java/com/netflix/http4/NamedConnectionPool.java @@ -55,6 +55,7 @@ public class NamedConnectionPool extends ConnPoolByRoute { private Counter deleteCounter; private Timer requestTimer; private Timer creationTimer; + private String name; public NamedConnectionPool(String name, ClientConnectionOperator operator, ConnPerRoute connPerRoute, int maxTotalConnections, long connTTL, @@ -100,6 +101,7 @@ public class NamedConnectionPool extends ConnPoolByRoute { deleteCounter = Monitors.newCounter(name + "_Delete"); requestTimer = Monitors.newTimer(name + "_RequestConnectionTimer", TimeUnit.MILLISECONDS); creationTimer = Monitors.newTimer(name + "_CreateConnectionTimer", TimeUnit.MILLISECONDS); + this.name = name; Monitors.registerObject(name, this); } @@ -180,4 +182,9 @@ public class NamedConnectionPool extends ConnPoolByRoute { return this.getConnectionsInPool(); } + @Override + public void shutdown() { + super.shutdown(); + Monitors.unregisterObject(name, this); + } } diff --git a/ribbon-httpclient/src/main/java/com/netflix/niws/client/http/RestClient.java b/ribbon-httpclient/src/main/java/com/netflix/niws/client/http/RestClient.java index 2667728..6fb717a 100644 --- a/ribbon-httpclient/src/main/java/com/netflix/niws/client/http/RestClient.java +++ b/ribbon-httpclient/src/main/java/com/netflix/niws/client/http/RestClient.java @@ -26,12 +26,8 @@ import java.net.URL; import java.net.URLDecoder; import java.security.KeyStore; import java.util.Collection; -import java.util.Iterator; import java.util.Map; -import javax.ws.rs.core.MultivaluedMap; - -import com.netflix.client.ClientFactory; import org.apache.http.HttpHost; import org.apache.http.client.HttpClient; import org.apache.http.client.UserTokenHandler; @@ -53,7 +49,7 @@ import org.slf4j.LoggerFactory; import com.netflix.client.AbstractLoadBalancerAwareClient; import com.netflix.client.ClientException; -import com.netflix.client.ClientRequest; +import com.netflix.client.ClientFactory; import com.netflix.client.config.CommonClientConfigKey; import com.netflix.client.config.DefaultClientConfigImpl; import com.netflix.client.config.IClientConfig; @@ -68,10 +64,11 @@ import com.netflix.http4.NFHttpClientConstants; import com.netflix.http4.NFHttpClientFactory; import com.netflix.http4.NFHttpMethodRetryHandler; import com.netflix.http4.ssl.KeyStoreAwareSocketFactory; +import com.netflix.loadbalancer.BaseLoadBalancer; +import com.netflix.loadbalancer.ILoadBalancer; import com.netflix.niws.cert.AbstractSslContextFactory; import com.netflix.niws.client.ClientSslSocketFactoryException; import com.netflix.niws.client.URLSslContextFactory; -import com.netflix.niws.client.http.HttpClientRequest.Verb; import com.netflix.util.Pair; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientResponse; @@ -685,4 +682,12 @@ public class RestClient extends AbstractLoadBalancerAwareClient