提交 c4e912c6 编写于 作者: Hpatron's avatar Hpatron

Fix consistency hash bug

上级 91cd884e
package github.javaguide.loadbalance;
import github.javaguide.remoting.dto.RpcRequest;
import java.util.List;
/**
......@@ -10,16 +12,16 @@ import java.util.List;
*/
public abstract class AbstractLoadBalance implements LoadBalance {
@Override
public String selectServiceAddress(List<String> serviceAddresses, String rpcServiceName) {
public String selectServiceAddress(List<String> serviceAddresses, RpcRequest rpcRequest) {
if (serviceAddresses == null || serviceAddresses.size() == 0) {
return null;
}
if (serviceAddresses.size() == 1) {
return serviceAddresses.get(0);
}
return doSelect(serviceAddresses, rpcServiceName);
return doSelect(serviceAddresses, rpcRequest);
}
protected abstract String doSelect(List<String> serviceAddresses, String rpcServiceName);
protected abstract String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest);
}
package github.javaguide.loadbalance;
import github.javaguide.extension.SPI;
import github.javaguide.remoting.dto.RpcRequest;
import java.util.List;
......@@ -18,5 +19,5 @@ public interface LoadBalance {
* @param serviceAddresses Service address list
* @return target service address
*/
String selectServiceAddress(List<String> serviceAddresses, String rpcServiceName);
String selectServiceAddress(List<String> serviceAddresses, RpcRequest rpcRequest);
}
package github.javaguide.loadbalance.loadbalancer;
import github.javaguide.loadbalance.AbstractLoadBalance;
import github.javaguide.remoting.dto.RpcRequest;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
......@@ -22,8 +24,10 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance {
private final ConcurrentHashMap<String, ConsistentHashSelector> selectors = new ConcurrentHashMap<>();
@Override
protected String doSelect(List<String> serviceAddresses, String rpcServiceName) {
protected String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) {
int identityHashCode = System.identityHashCode(serviceAddresses);
// build rpc service name by rpcRequest
String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
ConsistentHashSelector selector = selectors.get(rpcServiceName);
......@@ -33,7 +37,7 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance {
selector = selectors.get(rpcServiceName);
}
return selector.select(rpcServiceName);
return selector.select(rpcServiceName+ Arrays.stream(rpcRequest.getParameters()));
}
static class ConsistentHashSelector {
......@@ -73,8 +77,8 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance {
return ((long) (digest[3 + idx * 4] & 255) << 24 | (long) (digest[2 + idx * 4] & 255) << 16 | (long) (digest[1 + idx * 4] & 255) << 8 | (long) (digest[idx * 4] & 255)) & 4294967295L;
}
public String select(String rpcServiceName) {
byte[] digest = md5(rpcServiceName);
public String select(String rpcServiceKey) {
byte[] digest = md5(rpcServiceKey);
return selectForKey(hash(digest, 0));
}
......
package github.javaguide.loadbalance.loadbalancer;
import github.javaguide.loadbalance.AbstractLoadBalance;
import github.javaguide.remoting.dto.RpcRequest;
import java.util.List;
import java.util.Random;
......@@ -13,7 +14,7 @@ import java.util.Random;
*/
public class RandomLoadBalance extends AbstractLoadBalance {
@Override
protected String doSelect(List<String> serviceAddresses, String rpcServiceName) {
protected String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) {
Random random = new Random();
return serviceAddresses.get(random.nextInt(serviceAddresses.size()));
}
......
package github.javaguide.registry;
import github.javaguide.extension.SPI;
import github.javaguide.remoting.dto.RpcRequest;
import java.net.InetSocketAddress;
......@@ -15,8 +16,8 @@ public interface ServiceDiscovery {
/**
* lookup service by rpcServiceName
*
* @param rpcServiceName rpc service name
* @param rpcRequest rpc service pojo
* @return service address
*/
InetSocketAddress lookupService(String rpcServiceName);
InetSocketAddress lookupService(RpcRequest rpcRequest);
}
......@@ -6,6 +6,7 @@ import github.javaguide.extension.ExtensionLoader;
import github.javaguide.loadbalance.LoadBalance;
import github.javaguide.registry.ServiceDiscovery;
import github.javaguide.registry.zk.util.CuratorUtils;
import github.javaguide.remoting.dto.RpcRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
......@@ -27,14 +28,15 @@ public class ZkServiceDiscovery implements ServiceDiscovery {
}
@Override
public InetSocketAddress lookupService(String rpcServiceName) {
public InetSocketAddress lookupService(RpcRequest rpcRequest) {
String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
CuratorFramework zkClient = CuratorUtils.getZkClient();
List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName);
if (serviceUrlList == null || serviceUrlList.size() == 0) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName);
}
// load balancing
String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcServiceName);
String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);
log.info("Successfully found the service address:[{}]", targetServiceUrl);
String[] socketAddressArray = targetServiceUrl.split(":");
String host = socketAddressArray[0];
......
......@@ -98,9 +98,9 @@ public final class NettyRpcClient implements RpcRequestTransport {
// build return value
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
// build rpc service name by rpcRequest
String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
// String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
// get server address
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcServiceName);
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);
// get server address related channel
Channel channel = getChannel(inetSocketAddress);
if (channel.isActive()) {
......
......@@ -35,7 +35,7 @@ public class SocketRpcClient implements RpcRequestTransport {
// build rpc service name by rpcRequest
String rpcServiceName = RpcServiceProperties.builder().serviceName(rpcRequest.getInterfaceName())
.group(rpcRequest.getGroup()).version(rpcRequest.getVersion()).build().toRpcServiceName();
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcServiceName);
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);
try (Socket socket = new Socket()) {
socket.connect(inetSocketAddress);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
......
......@@ -2,11 +2,13 @@ package github.javaguide.loadbalance.loadbalancer;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.loadbalance.LoadBalance;
import github.javaguide.remoting.dto.RpcRequest;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
......@@ -17,10 +19,29 @@ class ConsistentHashLoadBalanceTest {
LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("loadBalance");
List<String> serviceUrlList = new ArrayList<>(Arrays.asList("127.0.0.1:9997", "127.0.0.1:9998", "127.0.0.1:9999"));
String userRpcServiceName = "github.javaguide.UserServicetest1version1";
String userServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, userRpcServiceName);
//build rpcCall
RpcRequest rpcRequest = RpcRequest.builder()
// .parameters(args)
.interfaceName(userRpcServiceName)
// .paramTypes(method.getParameterTypes())
.requestId(UUID.randomUUID().toString())
.group("test2")
.version("version2")
.build();
String userServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);
assertEquals("127.0.0.1:9999",userServiceAddress);
String schoolRpcServiceName = "github.javaguide.SchoolServicetest1version1";
String schoolServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, schoolRpcServiceName);
rpcRequest = RpcRequest.builder()
// .parameters(args)
.interfaceName(userRpcServiceName)
// .paramTypes(method.getParameterTypes())
.requestId(UUID.randomUUID().toString())
.group("test2")
.version("version2")
.build();
String schoolServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);
assertEquals("127.0.0.1:9997",schoolServiceAddress);
}
}
\ No newline at end of file
......@@ -2,9 +2,11 @@ package github.javaguide.registry;
import github.javaguide.registry.zk.ZkServiceDiscovery;
import github.javaguide.registry.zk.ZkServiceRegistry;
import github.javaguide.remoting.dto.RpcRequest;
import org.junit.jupiter.api.Test;
import java.net.InetSocketAddress;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
......@@ -21,7 +23,15 @@ class ZkServiceRegistryTest {
InetSocketAddress givenInetSocketAddress = new InetSocketAddress("127.0.0.1", 9333);
zkServiceRegistry.registerService("github.javaguide.registry.zk.ZkServiceRegistry", givenInetSocketAddress);
ServiceDiscovery zkServiceDiscovery = new ZkServiceDiscovery();
InetSocketAddress acquiredInetSocketAddress = zkServiceDiscovery.lookupService("github.javaguide.registry.zk.ZkServiceRegistry");
RpcRequest rpcRequest = RpcRequest.builder()
// .parameters(args)
.interfaceName("github.javaguide.registry.zk.ZkServiceRegistry")
// .paramTypes(method.getParameterTypes())
.requestId(UUID.randomUUID().toString())
.group("test2")
.version("version2")
.build();
InetSocketAddress acquiredInetSocketAddress = zkServiceDiscovery.lookupService(rpcRequest);
assertEquals(givenInetSocketAddress.toString(), acquiredInetSocketAddress.toString());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册