提交 f9add2b3 编写于 作者: A Allen Wang

Added connection creation timer. Use static ScheduledThreadPoolExecutor for...

Added connection creation timer. Use static ScheduledThreadPoolExecutor for all DynamicServerListLoadBalancer instances with configurable core thread pool size.
上级 f4e8aa34
...@@ -35,6 +35,7 @@ import com.netflix.client.ClientFactory; ...@@ -35,6 +35,7 @@ import com.netflix.client.ClientFactory;
import com.netflix.client.config.CommonClientConfigKey; import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl; import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig; import com.netflix.client.config.IClientConfig;
import com.netflix.config.DynamicProperty;
import com.netflix.servo.annotations.DataSourceType; import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor; import com.netflix.servo.annotations.Monitor;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
...@@ -57,7 +58,7 @@ public class DynamicServerListLoadBalancer<T extends Server> extends ...@@ -57,7 +58,7 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
boolean isSecure = false; boolean isSecure = false;
boolean useTunnel = false; boolean useTunnel = false;
private Thread _shutdownThread; private static Thread _shutdownThread;
// to keep track of modification of server lists // to keep track of modification of server lists
protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean( protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(
...@@ -70,7 +71,7 @@ public class DynamicServerListLoadBalancer<T extends Server> extends ...@@ -70,7 +71,7 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
// 30 // 30
// secs // secs
private ScheduledThreadPoolExecutor _serverListRefreshExecutor = null; private static ScheduledThreadPoolExecutor _serverListRefreshExecutor = null;
private long refeshIntervalMills = LISTOFSERVERS_CACHE_REPEAT_INTERVAL; private long refeshIntervalMills = LISTOFSERVERS_CACHE_REPEAT_INTERVAL;
...@@ -81,7 +82,21 @@ public class DynamicServerListLoadBalancer<T extends Server> extends ...@@ -81,7 +82,21 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
private AtomicLong lastUpdated = new AtomicLong(System.currentTimeMillis()); private AtomicLong lastUpdated = new AtomicLong(System.currentTimeMillis());
protected volatile boolean serverRefreshEnabled = false; protected volatile boolean serverRefreshEnabled = false;
private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
static {
int coreSize = DynamicProperty.getInstance(CORE_THREAD).getInteger(2);
ThreadFactory factory = (new ThreadFactoryBuilder()).setDaemon(true).build();
_serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
_shutdownThread = new Thread(new Runnable() {
public void run() {
LOGGER.info("Shutting down the Executor Pool for DynamicServerListLoadBalancer");
shutdownExecutorPool();
}
});
Runtime.getRuntime().addShutdownHook(_shutdownThread);
}
public DynamicServerListLoadBalancer() { public DynamicServerListLoadBalancer() {
super(); super();
} }
...@@ -200,26 +215,8 @@ public class DynamicServerListLoadBalancer<T extends Server> extends ...@@ -200,26 +215,8 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
* feature enabled * feature enabled
*/ */
public void enableAndInitLearnNewServersFeature() { public void enableAndInitLearnNewServersFeature() {
String threadName = "DynamicServerListLoadBalancer-" + getIdentifier();
ThreadFactory factory = (new ThreadFactoryBuilder()).setDaemon(true)
.setNameFormat(threadName).build();
_serverListRefreshExecutor = new ScheduledThreadPoolExecutor(1, factory);
keepServerListUpdated(); keepServerListUpdated();
serverRefreshEnabled = true; serverRefreshEnabled = true;
// Add it to the shutdown hook
if (_shutdownThread == null) {
_shutdownThread = new Thread(new Runnable() {
public void run() {
LOGGER.info("Shutting down the Executor Pool for "
+ getIdentifier());
shutdownExecutorPool();
}
});
Runtime.getRuntime().addShutdownHook(_shutdownThread);
}
} }
private String getIdentifier() { private String getIdentifier() {
...@@ -233,7 +230,7 @@ public class DynamicServerListLoadBalancer<T extends Server> extends ...@@ -233,7 +230,7 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
} }
public void shutdownExecutorPool() { private static void shutdownExecutorPool() {
if (_serverListRefreshExecutor != null) { if (_serverListRefreshExecutor != null) {
_serverListRefreshExecutor.shutdown(); _serverListRefreshExecutor.shutdown();
......
...@@ -260,10 +260,6 @@ public class DiscoveryEnabledLoadBalancerSupportsPortOverrideTest { ...@@ -260,10 +260,6 @@ public class DiscoveryEnabledLoadBalancerSupportsPortOverrideTest {
Assert.assertEquals(8001, serverList2.get(0).getPort()); // client property indicated in ii Assert.assertEquals(8001, serverList2.get(0).getPort()); // client property indicated in ii
Assert.assertEquals(8001, serverList2.get(0).getInstanceInfo().getPort()); // client property indicated in ii Assert.assertEquals(8001, serverList2.get(0).getInstanceInfo().getPort()); // client property indicated in ii
Assert.assertEquals(7002, serverList2.get(0).getInstanceInfo().getSecurePort()); // 7002 is the secure default Assert.assertEquals(7002, serverList2.get(0).getInstanceInfo().getSecurePort()); // 7002 is the secure default
} }
......
...@@ -54,6 +54,7 @@ public class NamedConnectionPool extends ConnPoolByRoute { ...@@ -54,6 +54,7 @@ public class NamedConnectionPool extends ConnPoolByRoute {
private Counter releaseCounter; private Counter releaseCounter;
private Counter deleteCounter; private Counter deleteCounter;
private Timer requestTimer; private Timer requestTimer;
private Timer creationTimer;
public NamedConnectionPool(String name, ClientConnectionOperator operator, public NamedConnectionPool(String name, ClientConnectionOperator operator,
ConnPerRoute connPerRoute, int maxTotalConnections, long connTTL, ConnPerRoute connPerRoute, int maxTotalConnections, long connTTL,
...@@ -97,7 +98,8 @@ public class NamedConnectionPool extends ConnPoolByRoute { ...@@ -97,7 +98,8 @@ public class NamedConnectionPool extends ConnPoolByRoute {
requestCounter = Monitors.newCounter(name + "_Request"); requestCounter = Monitors.newCounter(name + "_Request");
releaseCounter = Monitors.newCounter(name + "_Release"); releaseCounter = Monitors.newCounter(name + "_Release");
deleteCounter = Monitors.newCounter(name + "_Delete"); deleteCounter = Monitors.newCounter(name + "_Delete");
requestTimer = Monitors.newTimer(name + "RequestEntry", TimeUnit.MILLISECONDS); requestTimer = Monitors.newTimer(name + "_RequestConnectionTimer", TimeUnit.MILLISECONDS);
creationTimer = Monitors.newTimer(name + "_CreateConnectionTimer", TimeUnit.MILLISECONDS);
Monitors.registerObject(name, this); Monitors.registerObject(name, this);
} }
...@@ -119,12 +121,15 @@ public class NamedConnectionPool extends ConnPoolByRoute { ...@@ -119,12 +121,15 @@ public class NamedConnectionPool extends ConnPoolByRoute {
@Override @Override
protected BasicPoolEntry createEntry(RouteSpecificPool rospl, protected BasicPoolEntry createEntry(RouteSpecificPool rospl,
ClientConnectionOperator op) { ClientConnectionOperator op) {
createEntryCounter.increment(); Stopwatch stopWatch = creationTimer.start();
return super.createEntry(rospl, op); try {
createEntryCounter.increment();
return super.createEntry(rospl, op);
} finally {
stopWatch.stop();
}
} }
@Override @Override
protected BasicPoolEntry getEntryBlocking(HttpRoute route, Object state, protected BasicPoolEntry getEntryBlocking(HttpRoute route, Object state,
long timeout, TimeUnit tunit, WaitingThreadAborter aborter) long timeout, TimeUnit tunit, WaitingThreadAborter aborter)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册