提交 46e24fb9 编写于 作者: A allenxwang

Merge pull request #77 from allenxwang/cp

Added connection creation timer. Use static ScheduledThreadPoolExecutor for all DynamicServerListLoadBalancer instances with configurable core thread pool size.
......@@ -35,6 +35,7 @@ import com.netflix.client.ClientFactory;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.config.DynamicProperty;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import com.google.common.annotations.VisibleForTesting;
......@@ -57,7 +58,7 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
boolean isSecure = false;
boolean useTunnel = false;
private Thread _shutdownThread;
private static Thread _shutdownThread;
// to keep track of modification of server lists
protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(
......@@ -70,7 +71,7 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
// 30
// secs
private ScheduledThreadPoolExecutor _serverListRefreshExecutor = null;
private static ScheduledThreadPoolExecutor _serverListRefreshExecutor = null;
private long refeshIntervalMills = LISTOFSERVERS_CACHE_REPEAT_INTERVAL;
......@@ -81,7 +82,21 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
private AtomicLong lastUpdated = new AtomicLong(System.currentTimeMillis());
protected volatile boolean serverRefreshEnabled = false;
private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
static {
int coreSize = DynamicProperty.getInstance(CORE_THREAD).getInteger(2);
ThreadFactory factory = (new ThreadFactoryBuilder()).setDaemon(true).build();
_serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
_shutdownThread = new Thread(new Runnable() {
public void run() {
LOGGER.info("Shutting down the Executor Pool for DynamicServerListLoadBalancer");
shutdownExecutorPool();
}
});
Runtime.getRuntime().addShutdownHook(_shutdownThread);
}
public DynamicServerListLoadBalancer() {
super();
}
......@@ -200,26 +215,8 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
* feature enabled
*/
public void enableAndInitLearnNewServersFeature() {
String threadName = "DynamicServerListLoadBalancer-" + getIdentifier();
ThreadFactory factory = (new ThreadFactoryBuilder()).setDaemon(true)
.setNameFormat(threadName).build();
_serverListRefreshExecutor = new ScheduledThreadPoolExecutor(1, factory);
keepServerListUpdated();
serverRefreshEnabled = true;
// Add it to the shutdown hook
if (_shutdownThread == null) {
_shutdownThread = new Thread(new Runnable() {
public void run() {
LOGGER.info("Shutting down the Executor Pool for "
+ getIdentifier());
shutdownExecutorPool();
}
});
Runtime.getRuntime().addShutdownHook(_shutdownThread);
}
}
private String getIdentifier() {
......@@ -233,7 +230,7 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
TimeUnit.MILLISECONDS);
}
public void shutdownExecutorPool() {
private static void shutdownExecutorPool() {
if (_serverListRefreshExecutor != null) {
_serverListRefreshExecutor.shutdown();
......
/*
*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.netflix.loadbalancer;
import static org.junit.Assert.*;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import com.google.common.collect.Lists;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
public class DynamicServerListLoadBalancerTest {
public static class MyServerList extends AbstractServerList<Server> {
public final static CountDownLatch latch = new CountDownLatch(5);
public static final List<Server> list = Lists.newArrayList(new Server("www.google.com:80"));
public MyServerList() {
}
public MyServerList(IClientConfig clientConfig) {
}
@Override
public List<Server> getInitialListOfServers() {
return list;
}
@Override
public List<Server> getUpdatedListOfServers() {
latch.countDown();
return list;
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
@Test
public void testDynamicServerListLoadBalancer() {
DefaultClientConfigImpl config = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
config.setProperty(CommonClientConfigKey.NIWSServerListClassName, MyServerList.class.getName());
config.setProperty(CommonClientConfigKey.NFLoadBalancerClassName, DynamicServerListLoadBalancer.class.getName());
config.setProperty(CommonClientConfigKey.ServerListRefreshInterval, "50");
DynamicServerListLoadBalancer<Server> lb = new DynamicServerListLoadBalancer<Server>(config);
try {
assertTrue(MyServerList.latch.await(2, TimeUnit.SECONDS));
} catch (InterruptedException e) { // NOPMD
}
assertEquals(lb.getServerList(false), MyServerList.list);
}
}
......@@ -260,10 +260,6 @@ public class DiscoveryEnabledLoadBalancerSupportsPortOverrideTest {
Assert.assertEquals(8001, serverList2.get(0).getPort()); // client property indicated in ii
Assert.assertEquals(8001, serverList2.get(0).getInstanceInfo().getPort()); // client property indicated in ii
Assert.assertEquals(7002, serverList2.get(0).getInstanceInfo().getSecurePort()); // 7002 is the secure default
}
......
......@@ -54,6 +54,7 @@ public class NamedConnectionPool extends ConnPoolByRoute {
private Counter releaseCounter;
private Counter deleteCounter;
private Timer requestTimer;
private Timer creationTimer;
public NamedConnectionPool(String name, ClientConnectionOperator operator,
ConnPerRoute connPerRoute, int maxTotalConnections, long connTTL,
......@@ -97,7 +98,8 @@ public class NamedConnectionPool extends ConnPoolByRoute {
requestCounter = Monitors.newCounter(name + "_Request");
releaseCounter = Monitors.newCounter(name + "_Release");
deleteCounter = Monitors.newCounter(name + "_Delete");
requestTimer = Monitors.newTimer(name + "RequestEntry", TimeUnit.MILLISECONDS);
requestTimer = Monitors.newTimer(name + "_RequestConnectionTimer", TimeUnit.MILLISECONDS);
creationTimer = Monitors.newTimer(name + "_CreateConnectionTimer", TimeUnit.MILLISECONDS);
Monitors.registerObject(name, this);
}
......@@ -120,11 +122,14 @@ public class NamedConnectionPool extends ConnPoolByRoute {
protected BasicPoolEntry createEntry(RouteSpecificPool rospl,
ClientConnectionOperator op) {
createEntryCounter.increment();
return super.createEntry(rospl, op);
Stopwatch stopWatch = creationTimer.start();
try {
return super.createEntry(rospl, op);
} finally {
stopWatch.stop();
}
}
@Override
protected BasicPoolEntry getEntryBlocking(HttpRoute route, Object state,
long timeout, TimeUnit tunit, WaitingThreadAborter aborter)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册