提交 fbde8055 编写于 作者: A allenxwang

Merge pull request #78 from allenxwang/cp

Add capability to stop individual server list refresh and make thread pool size dynamically configurable.
......@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
......@@ -35,6 +36,7 @@ import com.netflix.client.ClientFactory;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicProperty;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
......@@ -83,11 +85,21 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
protected volatile boolean serverRefreshEnabled = false;
private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
private volatile ScheduledFuture<?> scheduledFuture;
static {
int coreSize = DynamicProperty.getInstance(CORE_THREAD).getInteger(2);
int coreSize = poolSizeProp.get();
ThreadFactory factory = (new ThreadFactoryBuilder()).setDaemon(true).build();
_serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
poolSizeProp.addCallback(new Runnable() {
@Override
public void run() {
_serverListRefreshExecutor.setCorePoolSize(poolSizeProp.get());
}
});
_shutdownThread = new Thread(new Runnable() {
public void run() {
LOGGER.info("Shutting down the Executor Pool for DynamicServerListLoadBalancer");
......@@ -224,7 +236,7 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
}
private void keepServerListUpdated() {
_serverListRefreshExecutor.scheduleAtFixedRate(
scheduledFuture = _serverListRefreshExecutor.scheduleAtFixedRate(
new ServerListRefreshExecutorThread(),
LISTOFSERVERS_CACHE_UPDATE_DELAY, refeshIntervalMills,
TimeUnit.MILLISECONDS);
......@@ -247,6 +259,13 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
}
}
public void stopServerListRefreshing() {
serverRefreshEnabled = false;
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
}
/**
* Class that updates the list of Servers This is based on the method used
* by the client * Appropriate Filters are applied before coming up with the
......@@ -258,6 +277,9 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
class ServerListRefreshExecutorThread implements Runnable {
public void run() {
if (!serverRefreshEnabled) {
return;
}
try {
updateListOfServers();
......@@ -322,6 +344,15 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
return new Date(lastUpdated.get()).toString();
}
@Monitor(name="NumThreads", type=DataSourceType.GAUGE)
public int getCoreThreads() {
if (_serverListRefreshExecutor != null) {
return _serverListRefreshExecutor.getCorePoolSize();
} else {
return 0;
}
}
public String toString() {
StringBuilder sb = new StringBuilder("DynamicServerListLoadBalancer:");
sb.append(super.toString());
......
......@@ -22,6 +22,7 @@ import static org.junit.Assert.*;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
......@@ -34,6 +35,7 @@ public class DynamicServerListLoadBalancerTest {
public static class MyServerList extends AbstractServerList<Server> {
public final static CountDownLatch latch = new CountDownLatch(5);
public final static AtomicInteger counter = new AtomicInteger(0);
public static final List<Server> list = Lists.newArrayList(new Server("www.google.com:80"));
......@@ -50,6 +52,7 @@ public class DynamicServerListLoadBalancerTest {
@Override
public List<Server> getUpdatedListOfServers() {
counter.incrementAndGet();
latch.countDown();
return list;
}
......@@ -61,7 +64,7 @@ public class DynamicServerListLoadBalancerTest {
}
@Test
public void testDynamicServerListLoadBalancer() {
public void testDynamicServerListLoadBalancer() throws Exception {
DefaultClientConfigImpl config = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
config.setProperty(CommonClientConfigKey.NIWSServerListClassName, MyServerList.class.getName());
config.setProperty(CommonClientConfigKey.NFLoadBalancerClassName, DynamicServerListLoadBalancer.class.getName());
......@@ -72,6 +75,13 @@ public class DynamicServerListLoadBalancerTest {
} catch (InterruptedException e) { // NOPMD
}
assertEquals(lb.getServerList(false), MyServerList.list);
lb.stopServerListRefreshing();
Thread.sleep(1000);
int count = MyServerList.counter.get();
assertTrue(count >= 5);
Thread.sleep(1000);
assertEquals(count, MyServerList.counter.get());
}
}
......@@ -30,6 +30,7 @@ import java.util.Set;
import org.junit.Test;
import com.netflix.client.ClientFactory;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.http.HttpRequest;
import com.netflix.client.http.HttpResponse;
import com.netflix.config.ConfigurationManager;
......@@ -50,15 +51,14 @@ public class RestClientTest {
@Test
public void testExecuteWithLB() throws Exception {
ConfigurationManager.getConfigInstance().setProperty("allservices.ribbon." + CommonClientConfigKey.ReadTimeout, "10000");
RestClient client = (RestClient) ClientFactory.getNamedClient("allservices");
BaseLoadBalancer lb = new BaseLoadBalancer();
Server[] servers = new Server[]{new Server("www.google.com", 80), new Server("www.yahoo.com", 80), new Server("www.microsoft.com", 80)};
Server[] servers = new Server[]{new Server("www.google.com", 80)};
lb.addServers(Arrays.asList(servers));
client.setLoadBalancer(lb);
Set<URI> expected = new HashSet<URI>();
expected.add(new URI("http://www.google.com:80/"));
expected.add(new URI("http://www.microsoft.com:80/"));
expected.add(new URI("http://www.yahoo.com:80/"));
Set<URI> result = new HashSet<URI>();
HttpRequest request = HttpRequest.newBuilder().uri(new URI("/")).build();
for (int i = 0; i < 5; i++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册