提交 2d425302 编写于 作者: R RicardoZ

[refactor] Added consistent hash load balancing mode.

上级 65b3a177
......@@ -72,7 +72,7 @@
- [x] Netty 重用 Channel 避免重复连接服务端
- [x] 使用 `CompletableFuture` 包装接受客户端返回结果(之前的实现是通过 `AttributeMap` 绑定到 Channel 上实现的) 详见:[使用 CompletableFuture 优化接受服务提供端返回结果](./docs/使用CompletableFuture优化接受服务提供端返回结果.md)
- [x] **增加 Netty 心跳机制** : 保证客户端和服务端的连接不被断掉,避免重连。
- [x] **客户端调用远程服务的时候进行负载均衡** :调用服务的时候,从很多服务地址中根据相应的负载均衡算法选取一个服务地址。ps:目前只实现了随机负载均衡算法。
- [x] **客户端调用远程服务的时候进行负载均衡** :调用服务的时候,从很多服务地址中根据相应的负载均衡算法选取一个服务地址。ps:目前实现了随机负载均衡算法与一致性哈希算法。
- [x] **处理一个接口有多个类实现的情况** :对服务分组,发布服务的时候增加一个 group 参数即可。
- [x] **集成 Spring 通过注解注册服务**
- [x] **集成 Spring 通过注解进行服务消费** 。参考: [PR#10](https://github.com/Snailclimb/guide-rpc-framework/pull/10)
......
......@@ -10,16 +10,16 @@ import java.util.List;
*/
public abstract class AbstractLoadBalance implements LoadBalance {
@Override
public String selectServiceAddress(List<String> serviceAddresses) {
public String selectServiceAddress(List<String> serviceAddresses, String rpcServiceName) {
if (serviceAddresses == null || serviceAddresses.size() == 0) {
return null;
}
if (serviceAddresses.size() == 1) {
return serviceAddresses.get(0);
}
return doSelect(serviceAddresses);
return doSelect(serviceAddresses, rpcServiceName);
}
protected abstract String doSelect(List<String> serviceAddresses);
protected abstract String doSelect(List<String> serviceAddresses, String rpcServiceName);
}
package github.javaguide.loadbalance;
import github.javaguide.extension.SPI;
import java.util.List;
/**
......@@ -8,6 +10,7 @@ import java.util.List;
* @author shuang.kou
* @createTime 2020年06月21日 07:44:00
*/
@SPI
public interface LoadBalance {
/**
* Choose one from the list of existing service addresses list
......@@ -15,5 +18,5 @@ public interface LoadBalance {
* @param serviceAddresses Service address list
* @return target service address
*/
String selectServiceAddress(List<String> serviceAddresses);
String selectServiceAddress(List<String> serviceAddresses, String rpcServiceName);
}
package github.javaguide.loadbalance.loadbalancer;
import github.javaguide.loadbalance.AbstractLoadBalance;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
/**
* refer to dubbo consistent hash load balance: http://dubbo.apache.org/zh-cn/blog/dubbo-consistent-hash-implementation.html
* @author RicardoZ
* @createTime 2020年10月20日 18:15:20
*/
@Slf4j
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
@Override
protected String doSelect(List<String> serviceAddresses, String rpcServiceName) {
try {
ConsistentHashSelector consistentHashSelector = new ConsistentHashSelector(serviceAddresses, 160);
return consistentHashSelector.select(rpcServiceName);
} catch (NoSuchAlgorithmException e) {
log.error("An encryption algorithm that does not exist is used: ", e);
e.printStackTrace();
}
return null;
}
static class ConsistentHashSelector {
private final TreeMap<Long, String> virtualInvokers;
ConsistentHashSelector(List<String> invokers, int replicaNumber) throws NoSuchAlgorithmException {
this.virtualInvokers = new TreeMap<>();
for (String invoker : invokers) {
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(invoker + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
static byte[] md5(String key) throws NoSuchAlgorithmException {
MessageDigest md = MessageDigest.getInstance("md5");
byte[] bytes = key.getBytes(StandardCharsets.UTF_8);
md.update(bytes);
return md.digest();
}
static long hash(byte[] digest, int idx) {
return ((long) (digest[3 + idx * 4] & 255) << 24 | (long) (digest[2 + idx * 4] & 255) << 16 | (long) (digest[1 + idx * 4] & 255) << 8 | (long) (digest[idx * 4] & 255)) & 4294967295L;
}
public String select(String rpcServiceName) throws NoSuchAlgorithmException {
byte[] digest = md5(rpcServiceName);
return selectForKey(hash(digest, 0));
}
public String selectForKey(long hashCode) {
Map.Entry<Long, String> entry = virtualInvokers.tailMap(hashCode, true).firstEntry();
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
}
}
package github.javaguide.loadbalance;
package github.javaguide.loadbalance.loadbalancer;
import github.javaguide.loadbalance.AbstractLoadBalance;
import java.util.List;
import java.util.Random;
......@@ -11,7 +13,7 @@ import java.util.Random;
*/
public class RandomLoadBalance extends AbstractLoadBalance {
@Override
protected String doSelect(List<String> serviceAddresses) {
protected String doSelect(List<String> serviceAddresses, String rpcServiceName) {
Random random = new Random();
return serviceAddresses.get(random.nextInt(serviceAddresses.size()));
}
......
......@@ -2,8 +2,8 @@ package github.javaguide.registry.zk;
import github.javaguide.enums.RpcErrorMessageEnum;
import github.javaguide.exception.RpcException;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.loadbalance.LoadBalance;
import github.javaguide.loadbalance.RandomLoadBalance;
import github.javaguide.registry.ServiceDiscovery;
import github.javaguide.registry.zk.util.CuratorUtils;
import lombok.extern.slf4j.Slf4j;
......@@ -23,7 +23,7 @@ public class ZkServiceDiscovery implements ServiceDiscovery {
private final LoadBalance loadBalance;
public ZkServiceDiscovery() {
this.loadBalance = new RandomLoadBalance();
this.loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("loadBalance");
}
@Override
......@@ -34,7 +34,7 @@ public class ZkServiceDiscovery implements ServiceDiscovery {
throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName);
}
// load balancing
String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList);
String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcServiceName);
log.info("Successfully found the service address:[{}]", targetServiceUrl);
String[] socketAddressArray = targetServiceUrl.split(":");
String host = socketAddressArray[0];
......
loadBalance=github.javaguide.loadbalance.loadbalancer.ConsistentHashLoadBalance
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册