提交 2d6dfe02 编写于 作者: Y YunaiV

1. configurator

2. directory
3. loadbalance
4. merger
上级 8d35460c
......@@ -28,9 +28,13 @@ import java.util.Set;
/**
* AbstractOverrideConfigurator
*
* Configurator 抽象类,实现公用的配置规则的匹配、排序的逻辑
*/
public abstract class AbstractConfigurator implements Configurator {
/**
* 配置规则 URL
*/
private final URL configuratorUrl;
public AbstractConfigurator(URL url) {
......@@ -40,30 +44,34 @@ public abstract class AbstractConfigurator implements Configurator {
this.configuratorUrl = url;
}
public static void main(String[] args) {
System.out.println(URL.encode("timeout=100"));
}
@Override
public URL getUrl() {
return configuratorUrl;
}
@Override
public URL configure(URL url) {
if (configuratorUrl == null || configuratorUrl.getHost() == null
|| url == null || url.getHost() == null) {
if (configuratorUrl.getHost() == null || url == null || url.getHost() == null) {
return url;
}
// If override url has port, means it is a provider address. We want to control a specific provider with this override url, it may take effect on the specific provider instance or on consumers holding this provider instance.
// 配置规则,URL 带有端口( port ),意图是控制提供者机器。可以在提供端生效 也可以在消费端生效
if (configuratorUrl.getPort() != 0) {
if (url.getPort() == configuratorUrl.getPort()) {
return configureIfMatch(url.getHost(), url);
}
} else {// override url don't have a port, means the ip override url specify is a consumer address or 0.0.0.0
// override url don't have a port, means the ip override url specify is a consumer address or 0.0.0.0
// 配置规则,URL 没有端口,override 输入消费端地址 或者 0.0.0.0
} else {
// 1.If it is a consumer ip address, the intention is to control a specific consumer instance, it must takes effect at the consumer side, any provider received this override url should ignore;
// 2.If the ip is 0.0.0.0, this override url can be used on consumer, and also can be used on provider
// 1. 如果是消费端地址,则意图是控制消费者机器,必定在消费端生效,提供端忽略;
// 2. 如果是0.0.0.0可能是控制提供端,也可能是控制提供端
if (url.getParameter(Constants.SIDE_KEY, Constants.PROVIDER).equals(Constants.CONSUMER)) {
// NetUtils.getLocalHost是消费端注册到zk的消费者地址
return configureIfMatch(NetUtils.getLocalHost(), url);// NetUtils.getLocalHost is the ip address consumer registered to registry.
} else if (url.getParameter(Constants.SIDE_KEY, Constants.CONSUMER).equals(Constants.PROVIDER)) {
// 控制所有提供端,地址必定是0.0.0.0,否则就要配端口从而执行上面的if分支了
return configureIfMatch(Constants.ANYHOST_VALUE, url);// take effect on all providers, so address must be 0.0.0.0, otherwise it won't flow to this if branch
}
}
......@@ -71,34 +79,41 @@ public abstract class AbstractConfigurator implements Configurator {
}
private URL configureIfMatch(String host, URL url) {
// 匹配 Host
if (Constants.ANYHOST_VALUE.equals(configuratorUrl.getHost()) || host.equals(configuratorUrl.getHost())) {
String configApplication = configuratorUrl.getParameter(Constants.APPLICATION_KEY,
configuratorUrl.getUsername());
// 匹配 "application"
String configApplication = configuratorUrl.getParameter(Constants.APPLICATION_KEY, configuratorUrl.getUsername()); // TODO 8038 芋艿,configuratorUrl.getUsername() 为啥 username
String currentApplication = url.getParameter(Constants.APPLICATION_KEY, url.getUsername());
if (configApplication == null || Constants.ANY_VALUE.equals(configApplication)
|| configApplication.equals(currentApplication)) {
Set<String> condtionKeys = new HashSet<String>();
condtionKeys.add(Constants.CATEGORY_KEY);
condtionKeys.add(Constants.CHECK_KEY);
condtionKeys.add(Constants.DYNAMIC_KEY);
condtionKeys.add(Constants.ENABLED_KEY);
// 配置 URL 中的条件 KEYS 集合。其中下面四个 KEY ,不算是条件,而是内置属性。考虑到下面要移除,所以添加到该集合中。
Set<String> conditionKeys = new HashSet<String>();
conditionKeys.add(Constants.CATEGORY_KEY);
conditionKeys.add(Constants.CHECK_KEY);
conditionKeys.add(Constants.DYNAMIC_KEY);
conditionKeys.add(Constants.ENABLED_KEY);
// 判断传入的 url 是否匹配配置规则 URL 的条件。除了 "application" 和 "side" 之外,带有 `"~"` 开头的 KEY ,也是条件。
for (Map.Entry<String, String> entry : configuratorUrl.getParameters().entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith("~") || Constants.APPLICATION_KEY.equals(key) || Constants.SIDE_KEY.equals(key)) {
condtionKeys.add(key);
conditionKeys.add(key);
// 若不相等,则不匹配配置规则,直接返回
if (value != null && !Constants.ANY_VALUE.equals(value)
&& !value.equals(url.getParameter(key.startsWith("~") ? key.substring(1) : key))) {
return url;
}
}
}
return doConfigure(url, configuratorUrl.removeParameters(condtionKeys));
// 移除条件 KEYS 集合,并配置到 URL 中
return doConfigure(url, configuratorUrl.removeParameters(conditionKeys));
}
}
return url;
}
protected abstract URL doConfigure(URL currentUrl, URL configUrl);
/**
* Sort by host, priority
* 1. the url with a specific host ip should have higher priority than 0.0.0.0
......@@ -107,15 +122,26 @@ public abstract class AbstractConfigurator implements Configurator {
* @param o
* @return
*/
/**
* 根据 host、priority 依次排序
*
* 1. 特定 host 优先级高于 anyhost 0.0.0.0
* 2. priority值越大,优先级越高;
*
* @param o
* @return
*/
@Override
public int compareTo(Configurator o) {
if (o == null) {
return -1;
}
// host 升序
int ipCompare = getUrl().getHost().compareTo(o.getUrl().getHost());
// 若 host 相同,按照 priority 降序
if (ipCompare == 0) {//host is the same, sort by priority
int i = getUrl().getParameter(Constants.PRIORITY_KEY, 0),
j = o.getUrl().getParameter(Constants.PRIORITY_KEY, 0);
int i = getUrl().getParameter(Constants.PRIORITY_KEY, 0);
int j = o.getUrl().getParameter(Constants.PRIORITY_KEY, 0);
if (i < j) {
return -1;
} else if (i > j) {
......@@ -126,10 +152,11 @@ public abstract class AbstractConfigurator implements Configurator {
} else {
return ipCompare;
}
}
public static void main(String[] args) {
System.out.println(URL.encode("timeout=100"));
}
protected abstract URL doConfigure(URL currentUrl, URL configUrl);
}
......@@ -29,8 +29,9 @@ public class AbsentConfigurator extends AbstractConfigurator {
super(url);
}
@Override
public URL doConfigure(URL currentUrl, URL configUrl) {
return currentUrl.addParametersIfAbsent(configUrl.getParameters());
return currentUrl.addParametersIfAbsent(configUrl.getParameters()); // 不存在时添加
}
}
}
\ No newline at end of file
......@@ -26,8 +26,9 @@ import com.alibaba.dubbo.rpc.cluster.ConfiguratorFactory;
*/
public class AbsentConfiguratorFactory implements ConfiguratorFactory {
@Override
public Configurator getConfigurator(URL url) {
return new AbsentConfigurator(url);
}
}
}
\ No newline at end of file
......@@ -29,8 +29,9 @@ public class OverrideConfigurator extends AbstractConfigurator {
super(url);
}
@Override
public URL doConfigure(URL currentUrl, URL configUrl) {
return currentUrl.addParameters(configUrl.getParameters());
return currentUrl.addParameters(configUrl.getParameters()); // 覆盖添加
}
}
}
\ No newline at end of file
......@@ -23,11 +23,13 @@ import com.alibaba.dubbo.rpc.cluster.ConfiguratorFactory;
/**
* AbsentConfiguratorFactory
*
* OverrideConfigurator 工厂
*/
public class OverrideConfiguratorFactory implements ConfiguratorFactory {
@Override
public Configurator getConfigurator(URL url) {
return new OverrideConfigurator(url);
}
}
}
\ No newline at end of file
......@@ -35,20 +35,31 @@ import java.util.List;
/**
* Abstract implementation of Directory: Invoker list returned from this Directory's list method have been filtered by Routers
*
* Directory 抽象实现类, TODO 芋艿,优化注释 增加router的Directory
* <p>
* Directory 抽象实现类,实现了公用的路由规则的逻辑
*/
public abstract class AbstractDirectory<T> implements Directory<T> {
// logger
private static final Logger logger = LoggerFactory.getLogger(AbstractDirectory.class);
private final URL url;
/**
* 是否已经销毁
*/
private volatile boolean destroyed = false;
/**
* 注册中心 URL
*/
private final URL url;
/**
* 消费者 URL
*
* 若未显示调用 {@link #AbstractDirectory(URL, URL, List)} 构造方法,consumerUrl 等于 {@link #url}
*/
private volatile URL consumerUrl;
/**
* Router 数组
*/
private volatile List<Router> routers;
public AbstractDirectory(URL url) {
......@@ -60,19 +71,24 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
}
public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {
if (url == null)
if (url == null) {
throw new IllegalArgumentException("url == null");
}
this.url = url;
this.consumerUrl = consumerUrl;
// 设置 Router 数组
setRouters(routers);
}
@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
// 获得所有 Invoker 集合
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers; // local reference
// 根据路由规则,筛选 Invoker 集合
List<Router> localRouters = this.routers; // local reference 本地引用,避免并发问题
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {
try {
......@@ -87,14 +103,41 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
return invokers;
}
/**
* 获得所有 Invoker 集合
*
* @param invocation Invocation 对象
* @return Invoker 集合
*/
protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
@Override
public URL getUrl() {
return url;
}
public URL getConsumerUrl() {
return consumerUrl;
}
public void setConsumerUrl(URL consumerUrl) {
this.consumerUrl = consumerUrl;
}
public boolean isDestroyed() {
return destroyed;
}
@Override
public void destroy() {
destroyed = true;
}
public List<Router> getRouters() {
return routers;
}
// 【TODO 8036】
protected void setRouters(List<Router> routers) {
// copy list
routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers);
......@@ -110,22 +153,4 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
this.routers = routers;
}
public URL getConsumerUrl() {
return consumerUrl;
}
public void setConsumerUrl(URL consumerUrl) {
this.consumerUrl = consumerUrl;
}
public boolean isDestroyed() {
return destroyed;
}
public void destroy() {
destroyed = true;
}
protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
}
\ No newline at end of file
......@@ -31,6 +31,9 @@ import java.util.List;
*/
public class StaticDirectory<T> extends AbstractDirectory<T> {
/**
* Invoker 集合
*/
private final List<Invoker<T>> invokers;
public StaticDirectory(List<Invoker<T>> invokers) {
......@@ -46,20 +49,26 @@ public class StaticDirectory<T> extends AbstractDirectory<T> {
}
public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) {
// 默认使用 `url` 参数。当它为空时,使用 `invokers[0].url` 。
super(url == null && invokers != null && !invokers.isEmpty() ? invokers.get(0).getUrl() : url, routers);
if (invokers == null || invokers.isEmpty())
if (invokers == null || invokers.isEmpty()) {
throw new IllegalArgumentException("invokers == null");
}
this.invokers = invokers;
}
@Override
public Class<T> getInterface() {
return invokers.get(0).getInterface();
}
@Override
public boolean isAvailable() {
// 若已经销毁,则不可用
if (isDestroyed()) {
return false;
}
// 任一一个 Invoker 可用,则为可用
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return true;
......@@ -68,14 +77,19 @@ public class StaticDirectory<T> extends AbstractDirectory<T> {
return false;
}
@Override
public void destroy() {
// 若已经销毁, 跳过
if (isDestroyed()) {
return;
}
// 销毁
super.destroy();
// 销毁每个 Invoker
for (Invoker<T> invoker : invokers) {
invoker.destroy();
}
// 清空 Invoker 集合
invokers.clear();
}
......
......@@ -27,31 +27,41 @@ import java.util.List;
/**
* AbstractLoadBalance
*
* LoadBalance 抽象类,提供了权重计算的功能
*/
public abstract class AbstractLoadBalance implements LoadBalance {
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
// 计算权重
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
// 权重范围为 [0, weight] 之间
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers == null || invokers.isEmpty())
if (invokers == null || invokers.isEmpty()) {
return null;
if (invokers.size() == 1)
}
if (invokers.size() == 1) {
return invokers.get(0);
}
return doSelect(invokers, url, invocation);
}
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
// 获得 weight 配置,即服务权重。默认为 100
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
// 获得启动总时长
int uptime = (int) (System.currentTimeMillis() - timestamp);
// 获得预热需要总时长。默认为 10 * 60 * 1000 = 10 分钟
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
// 处于预热中,计算当前的权重
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);
}
......
......@@ -33,16 +33,25 @@ import java.util.concurrent.ConcurrentMap;
/**
* ConsistentHashLoadBalance
*
* 一致性 Hash,相同参数的请求总是发到同一提供者。
* 当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
*/
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
/**
* 服务方法与一致性哈希选择器的映射
*
* KEY:serviceKey + "." + methodName
*/
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
// 基于 invokers 集合,根据对象内存地址来计算定义哈希值
int identityHashCode = System.identityHashCode(invokers);
// 获得 ConsistentHashSelector 对象。若为空,或者定义哈希值变更(说明 invokers 集合发生变化),进行创建新的 ConsistentHashSelector 对象
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
......@@ -53,29 +62,46 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance {
private static final class ConsistentHashSelector<T> {
/**
* 虚拟节点与 Invoker 的映射关系
*/
private final TreeMap<Long, Invoker<T>> virtualInvokers;
/**
* 每个Invoker 对应的虚拟节点数
*/
private final int replicaNumber;
/**
* 定义哈希值
*/
private final int identityHashCode;
/**
* 取值参数位置数组
*/
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
// 设置 identityHashCode
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
// 初始化 replicaNumber
this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
// 初始化 argumentIndex
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
// 初始化 virtualInvokers
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
// 每四个虚拟结点为一组,为什么这样?下面会说到
for (int i = 0; i < replicaNumber / 4; i++) {
// 这组虚拟结点得到惟一名称
byte[] digest = md5(address + i);
// Md5是一个16字节长度的数组,将16字节的数组每四个字节一组,分别对应一个虚拟结点,这就是为什么上面把虚拟结点四个划分一组的原因
for (int h = 0; h < 4; h++) {
// 对于每四个字节,组成一个long值数值,做为这个虚拟节点的在环中的惟一key
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
......@@ -84,8 +110,11 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance {
}
public Invoker<T> select(Invocation invocation) {
// 基于方法参数,获得 KEY
String key = toKey(invocation.getArguments());
// 计算 MD5 值
byte[] digest = md5(key);
// 计算 KEY 值
return selectForKey(hash(digest, 0));
}
......@@ -100,10 +129,13 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance {
}
private Invoker<T> selectForKey(long hash) {
// 得到大于当前 key 的那个子 Map ,然后从中取出第一个 key ,就是大于且离它最近的那个 key
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
// 不存在,则取 virtualInvokers 第一个
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
// 存在,则返回
return entry.getValue();
}
......@@ -115,6 +147,7 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance {
& 0xFFFFFFFFL;
}
// 计算 MD5
private byte[] md5(String value) {
MessageDigest md5;
try {
......
......@@ -27,7 +27,9 @@ import java.util.Random;
/**
* LeastActiveLoadBalance
*
* <p>
* 最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
* 使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。
*/
public class LeastActiveLoadBalance extends AbstractLoadBalance {
......@@ -35,52 +37,55 @@ public class LeastActiveLoadBalance extends AbstractLoadBalance {
private final Random random = new Random();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int leastActive = -1; // The least active value of all invokers
int leastCount = 0; // The number of invokers having the same least active value (leastActive)
int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)
int totalWeight = 0; // The sum of weights
int firstWeight = 0; // Initial value, used for comparision
boolean sameWeight = true; // Every invoker has the same weight value?
int length = invokers.size(); // 总个数
int leastActive = -1; // 最小的活跃数
int leastCount = 0; // 相同最小活跃数的个数
int[] leastIndexes = new int[length]; // 相同最小活跃数的下标
int totalWeight = 0; // 总权重
int firstWeight = 0; // 第一个权重,用于于计算是否相同
boolean sameWeight = true; // 是否所有权重相同
// 计算获得相同最小活跃数的数组和个数
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight
if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
leastActive = active; // Record the current least active value
leastCount = 1; // Reset leastCount, count again based on current leastCount
leastIndexs[0] = i; // Reset
totalWeight = weight; // Reset
firstWeight = weight; // Record the weight the first invoker
sameWeight = true; // Reset, every invoker has the same weight value?
} else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
leastIndexs[leastCount++] = i; // Record index number of this invoker
totalWeight += weight; // Add this invoker's weight to totalWeight.
// If every invoker has the same weight?
if (sameWeight && i > 0
&& weight != firstWeight) {
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 活跃数
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 权重
if (leastActive == -1 || active < leastActive) { // 发现更小的活跃数,重新开始
leastActive = active; // 记录最小活跃数
leastCount = 1; // 重新统计相同最小活跃数的个数
leastIndexes[0] = i; // 重新记录最小活跃数下标
totalWeight = weight; // 重新累计总权重
firstWeight = weight; // 记录第一个权重
sameWeight = true; // 还原权重相同标识
} else if (active == leastActive) { // 累计相同最小的活跃数
leastIndexes[leastCount++] = i; // 累计相同最小活跃数下标
totalWeight += weight; // 累计总权重
// 判断所有权重是否一样
if (sameWeight && weight != firstWeight) {
sameWeight = false;
}
}
}
// assert(leastCount > 0)
if (leastCount == 1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexs[0]);
// 如果只有一个最小则直接返回
return invokers.get(leastIndexes[0]);
}
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
// 如果权重不相同且权重大于0则按总权重数随机
int offsetWeight = random.nextInt(totalWeight);
// Return a invoker based on the random value.
// 并确定随机值落在哪个片断上
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexs[i];
int leastIndex = leastIndexes[i];
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
if (offsetWeight <= 0)
if (offsetWeight <= 0) {
return invokers.get(leastIndex);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(leastIndexs[random.nextInt(leastCount)]);
// 如果权重相同或权重为0则均等随机
return invokers.get(leastIndexes[random.nextInt(leastCount)]);
}
}
\ No newline at end of file
......@@ -26,6 +26,8 @@ import java.util.Random;
/**
* random load balance.
*
* 随机,按权重设置随机概率。
* 在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
*/
public class RandomLoadBalance extends AbstractLoadBalance {
......@@ -33,29 +35,34 @@ public class RandomLoadBalance extends AbstractLoadBalance {
private final Random random = new Random();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int totalWeight = 0; // The sum of weights
boolean sameWeight = true; // Every invoker has the same weight?
// 计算总权限
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
int weight = getWeight(invokers.get(i), invocation); // 获得权重
totalWeight += weight; // Sum
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
// 权重不相等,随机后,判断在哪个 Invoker 的权重区间中
if (totalWeight > 0 && !sameWeight) {
// 随机
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offset = random.nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
// 区间判断
for (Invoker<T> invoker : invokers) {
offset -= getWeight(invoker, invocation);
if (offset < 0) {
return invokers.get(i);
return invoker;
}
}
}
// 权重相等,平均随机
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(random.nextInt(length));
}
......
......@@ -30,13 +30,21 @@ import java.util.concurrent.ConcurrentMap;
/**
* Round robin load balance.
*
* 轮循,按公约后的权重设置轮循比率。
* 存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。
*/
public class RoundRobinLoadBalance extends AbstractLoadBalance {
public static final String NAME = "roundrobin";
/**
* 服务方法与计数器的映射
*
* KEY:serviceKey + "." + methodName
*/
private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
int length = invokers.size(); // Number of invokers
......@@ -44,6 +52,7 @@ public class RoundRobinLoadBalance extends AbstractLoadBalance {
int minWeight = Integer.MAX_VALUE; // The minimum weight
final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
int weightSum = 0;
// 计算最小、最大权重,总的权重和。
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
......@@ -53,21 +62,26 @@ public class RoundRobinLoadBalance extends AbstractLoadBalance {
weightSum += weight;
}
}
// 获得 AtomicPositiveInteger 对象
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
}
// 获得当前顺序号,并递增 + 1
int currentSequence = sequence.getAndIncrement();
// 权重不相等,顺序根据权重分配
if (maxWeight > 0 && minWeight < maxWeight) {
int mod = currentSequence % weightSum;
for (int i = 0; i < maxWeight; i++) {
for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
int mod = currentSequence % weightSum; // 剩余权重
for (int i = 0; i < maxWeight; i++) { // 循环最大权重
for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { // 循环 Invoker 集合
final Invoker<T> k = each.getKey();
final IntegerWrapper v = each.getValue();
// 剩余权重归 0 ,当前 Invoker 还有剩余权重,返回该 Invoker 对象
if (mod == 0 && v.getValue() > 0) {
return k;
}
// 若 Invoker 还有权重值,扣除它( value )和剩余权重( mod )。
if (v.getValue() > 0) {
v.decrement();
mod--;
......@@ -75,11 +89,16 @@ public class RoundRobinLoadBalance extends AbstractLoadBalance {
}
}
}
// 权重相等,平均顺序获得
// Round robin
return invokers.get(currentSequence % length);
}
private static final class IntegerWrapper {
/**
* 权重值
*/
private int value;
public IntegerWrapper(int value) {
......@@ -94,9 +113,13 @@ public class RoundRobinLoadBalance extends AbstractLoadBalance {
this.value = value;
}
/**
* 扣除一
*/
public void decrement() {
this.value--;
}
}
}
\ No newline at end of file
......@@ -56,4 +56,4 @@ public class ArrayMerger implements Merger<Object[]> {
return (Object[]) result;
}
}
}
\ No newline at end of file
......@@ -21,6 +21,7 @@ import com.alibaba.dubbo.rpc.cluster.Merger;
public class BooleanArrayMerger implements Merger<boolean[]> {
@Override
public boolean[] merge(boolean[]... items) {
int totalLen = 0;
for (boolean[] array : items) {
......
......@@ -21,6 +21,7 @@ import com.alibaba.dubbo.rpc.cluster.Merger;
public class ByteArrayMerger implements Merger<byte[]> {
@Override
public byte[] merge(byte[]... items) {
int total = 0;
for (byte[] array : items) {
......
......@@ -21,6 +21,7 @@ import com.alibaba.dubbo.rpc.cluster.Merger;
public class CharArrayMerger implements Merger<char[]> {
@Override
public char[] merge(char[]... items) {
int total = 0;
for (char[] array : items) {
......
......@@ -21,6 +21,7 @@ import com.alibaba.dubbo.rpc.cluster.Merger;
public class DoubleArrayMerger implements Merger<double[]> {
@Override
public double[] merge(double[]... items) {
int total = 0;
for (double[] array : items) {
......
......@@ -21,6 +21,7 @@ import com.alibaba.dubbo.rpc.cluster.Merger;
public class FloatArrayMerger implements Merger<float[]> {
@Override
public float[] merge(float[]... items) {
int total = 0;
for (float[] array : items) {
......
......@@ -21,6 +21,7 @@ import com.alibaba.dubbo.rpc.cluster.Merger;
public class IntArrayMerger implements Merger<int[]> {
@Override
public int[] merge(int[]... items) {
int totalLen = 0;
for (int[] item : items) {
......
......@@ -24,6 +24,7 @@ import java.util.List;
public class ListMerger implements Merger<List<?>> {
@Override
public List<Object> merge(List<?>... items) {
List<Object> result = new ArrayList<Object>();
for (List<?> item : items) {
......
......@@ -21,6 +21,7 @@ import com.alibaba.dubbo.rpc.cluster.Merger;
public class LongArrayMerger implements Merger<long[]> {
@Override
public long[] merge(long[]... items) {
int total = 0;
for (long[] array : items) {
......
......@@ -21,13 +21,19 @@ import com.alibaba.dubbo.rpc.cluster.Merger;
import java.util.HashMap;
import java.util.Map;
/**
* Map Merger 实现类
*/
public class MapMerger implements Merger<Map<?, ?>> {
@Override
public Map<?, ?> merge(Map<?, ?>... items) {
if (items.length == 0) {
return null;
}
// 创建结果 Map
Map<Object, Object> result = new HashMap<Object, Object>();
// 合并多个 Map
for (Map<?, ?> item : items) {
if (item != null) {
result.putAll(item);
......@@ -36,4 +42,4 @@ public class MapMerger implements Merger<Map<?, ?>> {
return result;
}
}
}
\ No newline at end of file
......@@ -25,24 +25,34 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Merger 工厂类
*/
public class MergerFactory {
private static final ConcurrentMap<Class<?>, Merger<?>> mergerCache =
new ConcurrentHashMap<Class<?>, Merger<?>>();
/**
* Merger 对象缓存
*/
private static final ConcurrentMap<Class<?>, Merger<?>> mergerCache = new ConcurrentHashMap<Class<?>, Merger<?>>();
public static <T> Merger<T> getMerger(Class<T> returnType) {
Merger result;
// 数组类型
if (returnType.isArray()) {
Class type = returnType.getComponentType();
// 从缓存中获得 Merger 对象
result = mergerCache.get(type);
if (result == null) {
loadMergers();
result = mergerCache.get(type);
}
// 获取不到,使用 ArrayMerger
if (result == null && !type.isPrimitive()) {
result = ArrayMerger.INSTANCE;
}
// 普通类型
} else {
// 从缓存中获得 Merger 对象
result = mergerCache.get(returnType);
if (result == null) {
loadMergers();
......@@ -52,13 +62,15 @@ public class MergerFactory {
return result;
}
/**
* 初始化所有的 Merger 拓展对象,到 mergerCache 缓存中。
*/
static void loadMergers() {
Set<String> names = ExtensionLoader.getExtensionLoader(Merger.class)
.getSupportedExtensions();
Set<String> names = ExtensionLoader.getExtensionLoader(Merger.class).getSupportedExtensions();
for (String name : names) {
Merger m = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(name);
mergerCache.putIfAbsent(ReflectUtils.getGenericClass(m.getClass()), m);
}
}
}
}
\ No newline at end of file
......@@ -23,16 +23,15 @@ import java.util.Set;
public class SetMerger implements Merger<Set<?>> {
@Override
public Set<Object> merge(Set<?>... items) {
Set<Object> result = new HashSet<Object>();
for (Set<?> item : items) {
if (item != null) {
result.addAll(item);
}
}
return result;
}
}
}
\ No newline at end of file
......@@ -19,14 +19,21 @@ package com.alibaba.dubbo.rpc.cluster.merger;
import com.alibaba.dubbo.rpc.cluster.Merger;
/**
* Short 数组 Merger 实现类
*/
public class ShortArrayMerger implements Merger<short[]> {
@Override
public short[] merge(short[]... items) {
// 计算合并后的数组大小
int total = 0;
for (short[] array : items) {
total += array.length;
}
// 创建结果数组
short[] result = new short[total];
// 合并多个数组
int index = 0;
for (short[] array : items) {
for (short item : array) {
......@@ -35,4 +42,5 @@ public class ShortArrayMerger implements Merger<short[]> {
}
return result;
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册