提交 0fcf7094 编写于 作者: A Allen Wang

Add capability to stop individual server list refresh. make thread pool size...

Add capability to stop individual server list refresh. make thread pool size dynamically configurable.
上级 926ef241
......@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
......@@ -35,6 +36,7 @@ import com.netflix.client.ClientFactory;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicProperty;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
......@@ -83,11 +85,21 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
protected volatile boolean serverRefreshEnabled = false;
private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
private volatile ScheduledFuture<?> scheduledFuture;
static {
int coreSize = DynamicProperty.getInstance(CORE_THREAD).getInteger(2);
int coreSize = poolSizeProp.get();
ThreadFactory factory = (new ThreadFactoryBuilder()).setDaemon(true).build();
_serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
poolSizeProp.addCallback(new Runnable() {
@Override
public void run() {
_serverListRefreshExecutor.setCorePoolSize(poolSizeProp.get());
}
});
_shutdownThread = new Thread(new Runnable() {
public void run() {
LOGGER.info("Shutting down the Executor Pool for DynamicServerListLoadBalancer");
......@@ -224,7 +236,7 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
}
private void keepServerListUpdated() {
_serverListRefreshExecutor.scheduleAtFixedRate(
scheduledFuture = _serverListRefreshExecutor.scheduleAtFixedRate(
new ServerListRefreshExecutorThread(),
LISTOFSERVERS_CACHE_UPDATE_DELAY, refeshIntervalMills,
TimeUnit.MILLISECONDS);
......@@ -247,6 +259,13 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
}
}
public void stopServerListRefreshing() {
serverRefreshEnabled = false;
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
}
/**
* Class that updates the list of Servers This is based on the method used
* by the client * Appropriate Filters are applied before coming up with the
......@@ -258,6 +277,9 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
class ServerListRefreshExecutorThread implements Runnable {
public void run() {
if (!serverRefreshEnabled) {
return;
}
try {
updateListOfServers();
......@@ -322,6 +344,15 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
return new Date(lastUpdated.get()).toString();
}
@Monitor(name="NumThreads", type=DataSourceType.GAUGE)
public int getCoreThreads() {
if (_serverListRefreshExecutor != null) {
return _serverListRefreshExecutor.getCorePoolSize();
} else {
return 0;
}
}
public String toString() {
StringBuilder sb = new StringBuilder("DynamicServerListLoadBalancer:");
sb.append(super.toString());
......
......@@ -22,6 +22,7 @@ import static org.junit.Assert.*;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
......@@ -34,6 +35,7 @@ public class DynamicServerListLoadBalancerTest {
public static class MyServerList extends AbstractServerList<Server> {
public final static CountDownLatch latch = new CountDownLatch(5);
public final static AtomicInteger counter = new AtomicInteger(0);
public static final List<Server> list = Lists.newArrayList(new Server("www.google.com:80"));
......@@ -50,6 +52,7 @@ public class DynamicServerListLoadBalancerTest {
@Override
public List<Server> getUpdatedListOfServers() {
counter.incrementAndGet();
latch.countDown();
return list;
}
......@@ -61,7 +64,7 @@ public class DynamicServerListLoadBalancerTest {
}
@Test
public void testDynamicServerListLoadBalancer() {
public void testDynamicServerListLoadBalancer() throws Exception {
DefaultClientConfigImpl config = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
config.setProperty(CommonClientConfigKey.NIWSServerListClassName, MyServerList.class.getName());
config.setProperty(CommonClientConfigKey.NFLoadBalancerClassName, DynamicServerListLoadBalancer.class.getName());
......@@ -72,6 +75,13 @@ public class DynamicServerListLoadBalancerTest {
} catch (InterruptedException e) { // NOPMD
}
assertEquals(lb.getServerList(false), MyServerList.list);
lb.stopServerListRefreshing();
Thread.sleep(1000);
int count = MyServerList.counter.get();
assertTrue(count >= 5);
Thread.sleep(1000);
assertEquals(count, MyServerList.counter.get());
}
}
......@@ -30,6 +30,7 @@ import java.util.Set;
import org.junit.Test;
import com.netflix.client.ClientFactory;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.http.HttpRequest;
import com.netflix.client.http.HttpResponse;
import com.netflix.config.ConfigurationManager;
......@@ -50,15 +51,14 @@ public class RestClientTest {
@Test
public void testExecuteWithLB() throws Exception {
ConfigurationManager.getConfigInstance().setProperty("allservices.ribbon." + CommonClientConfigKey.ReadTimeout, "10000");
RestClient client = (RestClient) ClientFactory.getNamedClient("allservices");
BaseLoadBalancer lb = new BaseLoadBalancer();
Server[] servers = new Server[]{new Server("www.google.com", 80), new Server("www.yahoo.com", 80), new Server("www.microsoft.com", 80)};
Server[] servers = new Server[]{new Server("www.google.com", 80)};
lb.addServers(Arrays.asList(servers));
client.setLoadBalancer(lb);
Set<URI> expected = new HashSet<URI>();
expected.add(new URI("http://www.google.com:80/"));
expected.add(new URI("http://www.microsoft.com:80/"));
expected.add(new URI("http://www.yahoo.com:80/"));
Set<URI> result = new HashSet<URI>();
HttpRequest request = HttpRequest.newBuilder().uri(new URI("/")).build();
for (int i = 0; i < 5; i++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册