提交 6177e44d 编写于 作者: A Allen Wang

Fix Issue #28

上级 195fb203
...@@ -310,8 +310,9 @@ public class PrimeConnections { ...@@ -310,8 +310,9 @@ public class PrimeConnections {
return executorService.submit(ftConn); return executorService.submit(ftConn);
} }
void shutdown() { public void shutdown() {
executorService.shutdown(); executorService.shutdown();
Monitors.unregisterObject(name + "_PrimeConnection", this);
} }
private Boolean connectToServer(final Server s, final PrimeConnectionListener listener) { private Boolean connectToServer(final Server s, final PrimeConnectionListener listener) {
......
...@@ -855,4 +855,13 @@ public class BaseLoadBalancer extends AbstractLoadBalancer implements ...@@ -855,4 +855,13 @@ public class BaseLoadBalancer extends AbstractLoadBalancer implements
boolean enablePrimingConnections) { boolean enablePrimingConnections) {
this.enablePrimingConnections = enablePrimingConnections; this.enablePrimingConnections = enablePrimingConnections;
} }
public void shutdown() {
cancelPingTask();
if (primeConnections != null) {
primeConnections.shutdown();
}
Monitors.unregisterObject("LoadBalancer_" + name, this);
Monitors.unregisterObject("Rule_" + name, this.getRule());
}
} }
...@@ -278,6 +278,9 @@ public class DynamicServerListLoadBalancer<T extends Server> extends ...@@ -278,6 +278,9 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
public void run() { public void run() {
if (!serverRefreshEnabled) { if (!serverRefreshEnabled) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return; return;
} }
try { try {
...@@ -353,10 +356,17 @@ public class DynamicServerListLoadBalancer<T extends Server> extends ...@@ -353,10 +356,17 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
} }
} }
@Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder("DynamicServerListLoadBalancer:"); StringBuilder sb = new StringBuilder("DynamicServerListLoadBalancer:");
sb.append(super.toString()); sb.append(super.toString());
sb.append("ServerList:" + String.valueOf(serverListImpl)); sb.append("ServerList:" + String.valueOf(serverListImpl));
return sb.toString(); return sb.toString();
} }
@Override
public void shutdown() {
super.shutdown();
stopServerListRefreshing();
}
} }
package com.netflix.utils;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import com.netflix.config.DynamicIntProperty;
/**
* A {@link ScheduledThreadPoolExecutor} whose core size can be dynamically changed by a given {@link DynamicIntProperty} and
* registers itself with a shutdown hook to shut down.
*
* @author awang
*
*/
public class ScheduledThreadPoolExectuorWithDynamicSize extends ScheduledThreadPoolExecutor {
private final Thread shutdownThread;
public ScheduledThreadPoolExectuorWithDynamicSize(final DynamicIntProperty corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize.get(), threadFactory);
corePoolSize.addCallback(new Runnable() {
public void run() {
setCorePoolSize(corePoolSize.get());
}
});
shutdownThread = new Thread(new Runnable() {
public void run() {
shutdown();
if (shutdownThread != null) {
try {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
} catch (IllegalStateException ise) { // NOPMD
}
}
}
});
Runtime.getRuntime().addShutdownHook(shutdownThread);
}
}
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
*/ */
package com.netflix.http4; package com.netflix.http4;
import java.util.Timer; import java.util.concurrent.ScheduledExecutorService;
import java.util.TimerTask; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.http.conn.ClientConnectionManager; import org.apache.http.conn.ClientConnectionManager;
...@@ -40,19 +40,18 @@ public class ConnectionPoolCleaner { ...@@ -40,19 +40,18 @@ public class ConnectionPoolCleaner {
String name = "default"; String name = "default";
ClientConnectionManager connMgr; ClientConnectionManager connMgr;
Timer timer; ScheduledExecutorService scheduler;
private DynamicIntProperty connIdleEvictTimeMilliSeconds private DynamicIntProperty connIdleEvictTimeMilliSeconds
= DynamicPropertyFactory.getInstance().getIntProperty("default.nfhttpclient.connIdleEvictTimeMilliSeconds", = DynamicPropertyFactory.getInstance().getIntProperty("default.nfhttpclient.connIdleEvictTimeMilliSeconds",
NFHttpClientConstants.DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS); NFHttpClientConstants.DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS);
boolean enableConnectionPoolCleanerTask = false; volatile boolean enableConnectionPoolCleanerTask = false;
long connectionCleanerTimerDelay = 10; long connectionCleanerTimerDelay = 10;
long connectionCleanerRepeatInterval = NFHttpClientConstants.DEFAULT_CONNECTION_IDLE_TIMERTASK_REPEAT_IN_MSECS; long connectionCleanerRepeatInterval = NFHttpClientConstants.DEFAULT_CONNECTION_IDLE_TIMERTASK_REPEAT_IN_MSECS;
private volatile ScheduledFuture<?> scheduledFuture;
public ConnectionPoolCleaner(String name, ClientConnectionManager connMgr){ public ConnectionPoolCleaner(String name, ClientConnectionManager connMgr, ScheduledExecutorService scheduler){
this.name = name; this.name = name;
this.connMgr = connMgr; this.connMgr = connMgr;
} }
...@@ -94,28 +93,22 @@ public class ConnectionPoolCleaner { ...@@ -94,28 +93,22 @@ public class ConnectionPoolCleaner {
} }
public void initTask(){ public void initTask(){
if (enableConnectionPoolCleanerTask){ if (enableConnectionPoolCleanerTask) {
timer = new Timer(name + "-ConnectionPoolCleanerThread", true); scheduledFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
timer.schedule(new TimerTask() { public void run() {
try {
public void run() { if (enableConnectionPoolCleanerTask) {
try { logger.debug("Connection pool clean up started for client {}", name);
cleanupConnections(); cleanupConnections();
} catch (Throwable e) { } else if (scheduledFuture != null) {
logger.error("Exception in ConnectionPoolCleanerThread",e); scheduledFuture.cancel(true);
//e.printStackTrace();
} }
} catch (Throwable e) {
logger.error("Exception in ConnectionPoolCleanerThread",e);
} }
}, connectionCleanerTimerDelay, connectionCleanerRepeatInterval);
logger.info("Initializing ConnectionPoolCleaner for NFHttpClient:" + name);
// Add it to the shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run()
{
logger.info("Stopping the ConnectionPoolCleaner Update Task");
timer.cancel();
} }
})); }, connectionCleanerTimerDelay, connectionCleanerRepeatInterval, TimeUnit.MILLISECONDS);
logger.info("Initializing ConnectionPoolCleaner for NFHttpClient:" + name);
} }
} }
...@@ -124,6 +117,13 @@ public class ConnectionPoolCleaner { ...@@ -124,6 +117,13 @@ public class ConnectionPoolCleaner {
connMgr.closeIdleConnections(connIdleEvictTimeMilliSeconds.get(), TimeUnit.MILLISECONDS); connMgr.closeIdleConnections(connIdleEvictTimeMilliSeconds.get(), TimeUnit.MILLISECONDS);
} }
public void shutdown() {
enableConnectionPoolCleanerTask = false;
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
}
public String toString(){ public String toString(){
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
...@@ -136,4 +136,5 @@ public class ConnectionPoolCleaner { ...@@ -136,4 +136,5 @@ public class ConnectionPoolCleaner {
} }
} }
...@@ -83,6 +83,4 @@ public class MonitoredConnectionManager extends ThreadSafeClientConnManager { ...@@ -83,6 +83,4 @@ public class MonitoredConnectionManager extends ThreadSafeClientConnManager {
// TODO Auto-generated method stub // TODO Auto-generated method stub
return super.requestConnection(route, state); return super.requestConnection(route, state);
} }
} }
...@@ -21,6 +21,8 @@ import java.io.IOException; ...@@ -21,6 +21,8 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
...@@ -44,6 +46,7 @@ import org.apache.http.protocol.HttpContext; ...@@ -44,6 +46,7 @@ import org.apache.http.protocol.HttpContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.client.config.CommonClientConfigKey; import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl; import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig; import com.netflix.client.config.IClientConfig;
...@@ -54,6 +57,7 @@ import com.netflix.servo.annotations.Monitor; ...@@ -54,6 +57,7 @@ import com.netflix.servo.annotations.Monitor;
import com.netflix.servo.monitor.Monitors; import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.monitor.Stopwatch; import com.netflix.servo.monitor.Stopwatch;
import com.netflix.servo.monitor.Timer; import com.netflix.servo.monitor.Timer;
import com.netflix.utils.ScheduledThreadPoolExectuorWithDynamicSize;
/** /**
* Netflix extension of Apache 4.0 HttpClient * Netflix extension of Apache 4.0 HttpClient
...@@ -67,7 +71,11 @@ public class NFHttpClient extends DefaultHttpClient { ...@@ -67,7 +71,11 @@ public class NFHttpClient extends DefaultHttpClient {
private static final Logger LOGGER = LoggerFactory.getLogger(NFHttpClient.class); private static final Logger LOGGER = LoggerFactory.getLogger(NFHttpClient.class);
protected static final String EXECUTE_TRACER = "HttpClient-ExecuteTimer"; protected static final String EXECUTE_TRACER = "HttpClient-ExecuteTimer";
private static final DynamicIntProperty CORE_SIZE = new DynamicIntProperty("NFHttpClient.connectionPoolCleanerNumberCoreThreads", 2);
private static ScheduledExecutorService connectionPoolCleanUpScheduler;
private HttpHost httpHost = null; private HttpHost httpHost = null;
private HttpRoute httpRoute = null; private HttpRoute httpRoute = null;
...@@ -87,6 +95,13 @@ public class NFHttpClient extends DefaultHttpClient { ...@@ -87,6 +95,13 @@ public class NFHttpClient extends DefaultHttpClient {
private DynamicIntProperty maxTotalConnectionProperty; private DynamicIntProperty maxTotalConnectionProperty;
private DynamicIntProperty maxConnectionPerHostProperty; private DynamicIntProperty maxConnectionPerHostProperty;
static {
ThreadFactory factory = (new ThreadFactoryBuilder()).setDaemon(true)
.setNameFormat("Connection pool clean up thread")
.build();
connectionPoolCleanUpScheduler = new ScheduledThreadPoolExectuorWithDynamicSize(CORE_SIZE, factory);
}
protected NFHttpClient(String host, int port){ protected NFHttpClient(String host, int port){
super(new ThreadSafeClientConnManager()); super(new ThreadSafeClientConnManager());
this.name = "UNNAMED_" + numNonNamedHttpClients.incrementAndGet(); this.name = "UNNAMED_" + numNonNamedHttpClients.incrementAndGet();
...@@ -128,7 +143,7 @@ public class NFHttpClient extends DefaultHttpClient { ...@@ -128,7 +143,7 @@ public class NFHttpClient extends DefaultHttpClient {
defaultHeaders.add(new BasicHeader("X-netflix-httpclientname", name)); defaultHeaders.add(new BasicHeader("X-netflix-httpclientname", name));
params.setParameter(ClientPNames.DEFAULT_HEADERS, defaultHeaders); params.setParameter(ClientPNames.DEFAULT_HEADERS, defaultHeaders);
connPoolCleaner = new ConnectionPoolCleaner(name, this.getConnectionManager()); connPoolCleaner = new ConnectionPoolCleaner(name, this.getConnectionManager(), connectionPoolCleanUpScheduler);
this.retriesProperty = DynamicPropertyFactory.getInstance().getIntProperty(this.name + ".nfhttpclient" + ".retries", 3); this.retriesProperty = DynamicPropertyFactory.getInstance().getIntProperty(this.name + ".nfhttpclient" + ".retries", 3);
this.sleepTimeFactorMsProperty = DynamicPropertyFactory.getInstance().getIntProperty(this.name + ".nfhttpclient"+ ".sleepTimeFactorMs", 10); this.sleepTimeFactorMsProperty = DynamicPropertyFactory.getInstance().getIntProperty(this.name + ".nfhttpclient"+ ".sleepTimeFactorMs", 10);
...@@ -292,4 +307,11 @@ public class NFHttpClient extends DefaultHttpClient { ...@@ -292,4 +307,11 @@ public class NFHttpClient extends DefaultHttpClient {
sw.stop(); sw.stop();
} }
} }
}
\ No newline at end of file public void shutdown() {
if (connPoolCleaner != null) {
connPoolCleaner.shutdown();
}
getConnectionManager().shutdown();
}
}
...@@ -21,7 +21,6 @@ import java.util.Map; ...@@ -21,7 +21,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections.keyvalue.MultiKey; import org.apache.commons.collections.keyvalue.MultiKey;
import org.apache.http.client.HttpClient;
import com.netflix.client.config.DefaultClientConfigImpl; import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig; import com.netflix.client.config.IClientConfig;
...@@ -89,9 +88,9 @@ public class NFHttpClientFactory { ...@@ -89,9 +88,9 @@ public class NFHttpClientFactory {
} }
public static void shutdownNFHttpClient(String name) { public static void shutdownNFHttpClient(String name) {
HttpClient c = namedClientMap.get(name); NFHttpClient c = namedClientMap.get(name);
if(c != null) { if (c != null) {
c.getConnectionManager().shutdown(); c.shutdown();
namedClientMap.remove(name); namedClientMap.remove(name);
Monitors.unregisterObject(name, c); Monitors.unregisterObject(name, c);
} }
......
...@@ -55,6 +55,7 @@ public class NamedConnectionPool extends ConnPoolByRoute { ...@@ -55,6 +55,7 @@ public class NamedConnectionPool extends ConnPoolByRoute {
private Counter deleteCounter; private Counter deleteCounter;
private Timer requestTimer; private Timer requestTimer;
private Timer creationTimer; private Timer creationTimer;
private String name;
public NamedConnectionPool(String name, ClientConnectionOperator operator, public NamedConnectionPool(String name, ClientConnectionOperator operator,
ConnPerRoute connPerRoute, int maxTotalConnections, long connTTL, ConnPerRoute connPerRoute, int maxTotalConnections, long connTTL,
...@@ -100,6 +101,7 @@ public class NamedConnectionPool extends ConnPoolByRoute { ...@@ -100,6 +101,7 @@ public class NamedConnectionPool extends ConnPoolByRoute {
deleteCounter = Monitors.newCounter(name + "_Delete"); deleteCounter = Monitors.newCounter(name + "_Delete");
requestTimer = Monitors.newTimer(name + "_RequestConnectionTimer", TimeUnit.MILLISECONDS); requestTimer = Monitors.newTimer(name + "_RequestConnectionTimer", TimeUnit.MILLISECONDS);
creationTimer = Monitors.newTimer(name + "_CreateConnectionTimer", TimeUnit.MILLISECONDS); creationTimer = Monitors.newTimer(name + "_CreateConnectionTimer", TimeUnit.MILLISECONDS);
this.name = name;
Monitors.registerObject(name, this); Monitors.registerObject(name, this);
} }
...@@ -180,4 +182,9 @@ public class NamedConnectionPool extends ConnPoolByRoute { ...@@ -180,4 +182,9 @@ public class NamedConnectionPool extends ConnPoolByRoute {
return this.getConnectionsInPool(); return this.getConnectionsInPool();
} }
@Override
public void shutdown() {
super.shutdown();
Monitors.unregisterObject(name, this);
}
} }
...@@ -26,12 +26,8 @@ import java.net.URL; ...@@ -26,12 +26,8 @@ import java.net.URL;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.security.KeyStore; import java.security.KeyStore;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import javax.ws.rs.core.MultivaluedMap;
import com.netflix.client.ClientFactory;
import org.apache.http.HttpHost; import org.apache.http.HttpHost;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.http.client.UserTokenHandler; import org.apache.http.client.UserTokenHandler;
...@@ -53,7 +49,7 @@ import org.slf4j.LoggerFactory; ...@@ -53,7 +49,7 @@ import org.slf4j.LoggerFactory;
import com.netflix.client.AbstractLoadBalancerAwareClient; import com.netflix.client.AbstractLoadBalancerAwareClient;
import com.netflix.client.ClientException; import com.netflix.client.ClientException;
import com.netflix.client.ClientRequest; import com.netflix.client.ClientFactory;
import com.netflix.client.config.CommonClientConfigKey; import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl; import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig; import com.netflix.client.config.IClientConfig;
...@@ -68,10 +64,11 @@ import com.netflix.http4.NFHttpClientConstants; ...@@ -68,10 +64,11 @@ import com.netflix.http4.NFHttpClientConstants;
import com.netflix.http4.NFHttpClientFactory; import com.netflix.http4.NFHttpClientFactory;
import com.netflix.http4.NFHttpMethodRetryHandler; import com.netflix.http4.NFHttpMethodRetryHandler;
import com.netflix.http4.ssl.KeyStoreAwareSocketFactory; import com.netflix.http4.ssl.KeyStoreAwareSocketFactory;
import com.netflix.loadbalancer.BaseLoadBalancer;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.niws.cert.AbstractSslContextFactory; import com.netflix.niws.cert.AbstractSslContextFactory;
import com.netflix.niws.client.ClientSslSocketFactoryException; import com.netflix.niws.client.ClientSslSocketFactoryException;
import com.netflix.niws.client.URLSslContextFactory; import com.netflix.niws.client.URLSslContextFactory;
import com.netflix.niws.client.http.HttpClientRequest.Verb;
import com.netflix.util.Pair; import com.netflix.util.Pair;
import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse;
...@@ -685,4 +682,12 @@ public class RestClient extends AbstractLoadBalancerAwareClient<HttpRequest, Htt ...@@ -685,4 +682,12 @@ public class RestClient extends AbstractLoadBalancerAwareClient<HttpRequest, Htt
} }
return super.deriveHostAndPortFromVipAddress(vipAddress); return super.deriveHostAndPortFromVipAddress(vipAddress);
} }
public void shutdown() {
ILoadBalancer lb = this.getLoadBalancer();
if (lb instanceof BaseLoadBalancer) {
((BaseLoadBalancer) lb).shutdown();
}
NFHttpClientFactory.shutdownNFHttpClient(restClientName);
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册