提交 670912b3 编写于 作者: A allenxwang

Merge pull request #10 from allenxwang/master

Make ZoneAwareLoadBalancer's rule object for each zone configurable. Avoid duplicated connection priming at client initialization.
......@@ -20,7 +20,7 @@ package com.netflix.loadbalancer;
import com.netflix.client.IClientConfigAware;
/**
* Class that provides a
* Class that provides a default implementation for setting and getting load balancer
* @author stonse
*
*/
......@@ -36,5 +36,5 @@ public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAw
@Override
public ILoadBalancer getLoadBalancer(){
return lb;
}
}
}
......@@ -90,7 +90,9 @@ public class BaseLoadBalancer extends AbstractLoadBalancer implements
private PrimeConnections primeConnections;
private boolean enablePrimingConnections = false;
private volatile boolean enablePrimingConnections = false;
private IClientConfig config;
/**
* Default constructor which sets name as "default", sets null ping, and
......@@ -139,6 +141,7 @@ public class BaseLoadBalancer extends AbstractLoadBalancer implements
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
this.config = clientConfig;
String clientName = clientConfig.getClientName();
String ruleClassName = (String) clientConfig
.getProperty(CommonClientConfigKey.NFLoadBalancerRuleClassName);
......@@ -201,6 +204,10 @@ public class BaseLoadBalancer extends AbstractLoadBalancer implements
init();
}
public IClientConfig getClientConfig() {
return config;
}
private boolean canSkipPing() {
if (ping == null
|| ping.getClass().getName().equals(DummyPing.class.getName())) {
......@@ -462,7 +469,7 @@ public class BaseLoadBalancer extends AbstractLoadBalancer implements
if (!allServerList.equals(allServers)) {
listChanged = true;
}
if (enablePrimingConnections) {
if (isEnablePrimingConnections()) {
for (Server server : allServers) {
if (!allServerList.contains(server)) {
server.setReadyToServe(false);
......@@ -816,7 +823,7 @@ public class BaseLoadBalancer extends AbstractLoadBalancer implements
s.setReadyToServe(true);
}
public final boolean isEnablePrimingConnections() {
public boolean isEnablePrimingConnections() {
return enablePrimingConnections;
}
......
......@@ -73,8 +73,6 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
volatile ServerListFilter<T> filter;
IClientConfig niwsClientConfig;
public DynamicServerListLoadBalancer() {
super();
}
......@@ -87,7 +85,6 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
public void initWithNiwsConfig(IClientConfig clientConfig) {
try {
super.initWithNiwsConfig(clientConfig);
this.niwsClientConfig = clientConfig;
String niwsServerListClassName = clientConfig.getProperty(
CommonClientConfigKey.NIWSServerListClassName,
DefaultClientConfigImpl.DEFAULT_SEVER_LIST_CLASS)
......@@ -109,14 +106,17 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
CommonClientConfigKey.ServerListRefreshInterval,
LISTOFSERVERS_CACHE_REPEAT_INTERVAL).toString());
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (this.isEnablePrimingConnections()
&& this.getPrimeConnections() != null) {
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getServerList(true));
}
this.setEnablePrimingConnections(primeConnection);
} catch (Exception e) {
throw new RuntimeException(
......@@ -214,7 +214,7 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
}
private String getIdentifier() {
return niwsClientConfig.getClientName();
return this.getClientConfig().getClientName();
}
private void keepServerListUpdated() {
......
......@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.netflix.client.ClientFactory;
import com.netflix.config.DynamicBooleanProperty;
import com.netflix.config.DynamicDoubleProperty;
import com.netflix.config.DynamicPropertyFactory;
......@@ -144,7 +145,9 @@ public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLo
zone = zone.toLowerCase();
BaseLoadBalancer loadBalancer = balancers.get(zone);
if (loadBalancer == null) {
loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, new AvailabilityFilteringRule(), this.getLoadBalancerStats());
// We need to create rule object for load balancer for each zone
IRule rule = cloneRule(this.getRule());
loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
if (prev != null) {
loadBalancer = prev;
......@@ -153,105 +156,29 @@ public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLo
return loadBalancer;
}
/**
* Choose a zone from a list of zones based on number of instances
*
* @param snapshot
* @param chooseFrom
* @return
*/
private static String randomChooseZone(Map<String, ZoneSnapshot> snapshot, Set<String> chooseFrom) {
if (chooseFrom == null || chooseFrom.size() == 0) {
return null;
}
String selectedZone = chooseFrom.iterator().next();
if (chooseFrom.size() == 1) {
return selectedZone;
}
int totalServerCount = 0;
for (String zone: chooseFrom) {
totalServerCount += snapshot.get(zone).getInstanceCount();
}
int index = random.nextInt(totalServerCount) + 1;
int sum = 0;
for (String zone: chooseFrom) {
sum += snapshot.get(zone).getInstanceCount();
if (index <= sum) {
selectedZone = zone;
break;
}
}
return selectedZone;
private IRule cloneRule(IRule toClone) {
IRule rule;
if (toClone == null) {
rule = new AvailabilityFilteringRule();
} else {
String ruleClass = toClone.getClass().getName();
try {
rule = (IRule) ClientFactory.instantiateInstanceWithClientConfig(ruleClass, this.getClientConfig());
} catch (Exception e) {
throw new RuntimeException("Unexpected exception creating rule for ZoneAwareLoadBalancer", e);
}
}
return rule;
}
private String chooseZone(Map<String, ZoneSnapshot> snapshot) {
// make a copy, this might be changed later
Set<String> availableZones = new HashSet<String>(snapshot.keySet());
if (availableZones == null || availableZones.size() == 0) {
return null;
}
if (availableZones.size() == 1) {
return availableZones.iterator().next();
}
Set<String> worstZones = new HashSet<String>();
double maxLoadPerServer = 0;
boolean limitedZoneAvailability = false;
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
for (Map.Entry<String, ZoneSnapshot> entry: snapshot.entrySet()) {
String zone = entry.getKey();
ZoneSnapshot zoneSnapshot = entry.getValue();
int instanceCount = zoneSnapshot.getInstanceCount();
if (instanceCount == 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
double loadPerServer = zoneSnapshot.getLoadPerServer();
if (((double) zoneSnapshot.getCircuitTrippedCount()) / instanceCount >= triggeringBlackoutPercentage.get()
|| loadPerServer < 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
// they are the same considering double calculation round error
worstZones.add(zone);
} else if (loadPerServer > maxLoadPerServer) {
maxLoadPerServer = loadPerServer;
worstZones.clear();
worstZones.add(zone);
}
}
}
}
if (maxLoadPerServer < triggeringLoad.get() && !limitedZoneAvailability) {
// zone override is not needed here
return null;
}
String zoneToAvoid = randomChooseZone(snapshot, worstZones);
if (zoneToAvoid != null) {
availableZones.remove(zoneToAvoid);
}
return randomChooseZone(snapshot, availableZones);
}
@Override
public void setRule(IRule rule) {
super.setRule(rule);
if (balancers != null) {
for (String zone: balancers.keySet()) {
balancers.get(zone).setRule(rule);
balancers.get(zone).setRule(cloneRule(rule));
}
}
}
......
......@@ -44,7 +44,7 @@ public class SimpleRoundRobinLBTest {
IPing ping = new PingFake();
IRule rule = new RoundRobinRule();
lb = new BaseLoadBalancer(ping,rule);
lb.setPingInterval(5);
lb.setPingInterval(1);
lb.setMaxTotalPingTime(2);
// the setting of servers is done by a call to DiscoveryService
......@@ -54,10 +54,12 @@ public class SimpleRoundRobinLBTest {
}
lb.addServers(servers);
// make sure the ping cycle has kicked in and all servers are set to alive
try {
Thread.sleep(2000);
Thread.sleep(5000);
} catch (InterruptedException e) {
}
System.out.println(lb.getServerList(true));
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册