From c4e912c607649827b210b0d9d2fcd6b3b2906419 Mon Sep 17 00:00:00 2001 From: mesmerizeBy <17770080230@qq.com> Date: Fri, 16 Apr 2021 11:34:14 +0800 Subject: [PATCH] Fix consistency hash bug --- .../loadbalance/AbstractLoadBalance.java | 8 +++--- .../javaguide/loadbalance/LoadBalance.java | 3 ++- .../ConsistentHashLoadBalance.java | 12 ++++++--- .../loadbalancer/RandomLoadBalance.java | 3 ++- .../javaguide/registry/ServiceDiscovery.java | 5 ++-- .../registry/zk/ZkServiceDiscovery.java | 6 +++-- .../netty/client/NettyRpcClient.java | 4 +-- .../transport/socket/SocketRpcClient.java | 2 +- .../ConsistentHashLoadBalanceTest.java | 25 +++++++++++++++++-- .../registry/ZkServiceRegistryTest.java | 12 ++++++++- 10 files changed, 61 insertions(+), 19 deletions(-) diff --git a/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/AbstractLoadBalance.java b/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/AbstractLoadBalance.java index 3df30e3..29e805d 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/AbstractLoadBalance.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/AbstractLoadBalance.java @@ -1,5 +1,7 @@ package github.javaguide.loadbalance; +import github.javaguide.remoting.dto.RpcRequest; + import java.util.List; /** @@ -10,16 +12,16 @@ import java.util.List; */ public abstract class AbstractLoadBalance implements LoadBalance { @Override - public String selectServiceAddress(List serviceAddresses, String rpcServiceName) { + public String selectServiceAddress(List serviceAddresses, RpcRequest rpcRequest) { if (serviceAddresses == null || serviceAddresses.size() == 0) { return null; } if (serviceAddresses.size() == 1) { return serviceAddresses.get(0); } - return doSelect(serviceAddresses, rpcServiceName); + return doSelect(serviceAddresses, rpcRequest); } - protected abstract String doSelect(List serviceAddresses, String rpcServiceName); + protected abstract String doSelect(List serviceAddresses, RpcRequest rpcRequest); } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/LoadBalance.java b/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/LoadBalance.java index 6b8bc3e..4a6fa26 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/LoadBalance.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/LoadBalance.java @@ -1,6 +1,7 @@ package github.javaguide.loadbalance; import github.javaguide.extension.SPI; +import github.javaguide.remoting.dto.RpcRequest; import java.util.List; @@ -18,5 +19,5 @@ public interface LoadBalance { * @param serviceAddresses Service address list * @return target service address */ - String selectServiceAddress(List serviceAddresses, String rpcServiceName); + String selectServiceAddress(List serviceAddresses, RpcRequest rpcRequest); } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalance.java b/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalance.java index ee6d59f..29be79e 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalance.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalance.java @@ -1,11 +1,13 @@ package github.javaguide.loadbalance.loadbalancer; import github.javaguide.loadbalance.AbstractLoadBalance; +import github.javaguide.remoting.dto.RpcRequest; import lombok.extern.slf4j.Slf4j; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -22,8 +24,10 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance { private final ConcurrentHashMap selectors = new ConcurrentHashMap<>(); @Override - protected String doSelect(List serviceAddresses, String rpcServiceName) { + protected String doSelect(List serviceAddresses, RpcRequest rpcRequest) { int identityHashCode = System.identityHashCode(serviceAddresses); + // build rpc service name by rpcRequest + String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName(); ConsistentHashSelector selector = selectors.get(rpcServiceName); @@ -33,7 +37,7 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance { selector = selectors.get(rpcServiceName); } - return selector.select(rpcServiceName); + return selector.select(rpcServiceName+ Arrays.stream(rpcRequest.getParameters())); } static class ConsistentHashSelector { @@ -73,8 +77,8 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance { return ((long) (digest[3 + idx * 4] & 255) << 24 | (long) (digest[2 + idx * 4] & 255) << 16 | (long) (digest[1 + idx * 4] & 255) << 8 | (long) (digest[idx * 4] & 255)) & 4294967295L; } - public String select(String rpcServiceName) { - byte[] digest = md5(rpcServiceName); + public String select(String rpcServiceKey) { + byte[] digest = md5(rpcServiceKey); return selectForKey(hash(digest, 0)); } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/RandomLoadBalance.java b/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/RandomLoadBalance.java index 0d8dd6d..3e18eca 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/RandomLoadBalance.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/RandomLoadBalance.java @@ -1,6 +1,7 @@ package github.javaguide.loadbalance.loadbalancer; import github.javaguide.loadbalance.AbstractLoadBalance; +import github.javaguide.remoting.dto.RpcRequest; import java.util.List; import java.util.Random; @@ -13,7 +14,7 @@ import java.util.Random; */ public class RandomLoadBalance extends AbstractLoadBalance { @Override - protected String doSelect(List serviceAddresses, String rpcServiceName) { + protected String doSelect(List serviceAddresses, RpcRequest rpcRequest) { Random random = new Random(); return serviceAddresses.get(random.nextInt(serviceAddresses.size())); } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java index b885af0..443fa34 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java @@ -1,6 +1,7 @@ package github.javaguide.registry; import github.javaguide.extension.SPI; +import github.javaguide.remoting.dto.RpcRequest; import java.net.InetSocketAddress; @@ -15,8 +16,8 @@ public interface ServiceDiscovery { /** * lookup service by rpcServiceName * - * @param rpcServiceName rpc service name + * @param rpcRequest rpc service pojo * @return service address */ - InetSocketAddress lookupService(String rpcServiceName); + InetSocketAddress lookupService(RpcRequest rpcRequest); } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java index ab67202..786cac8 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java @@ -6,6 +6,7 @@ import github.javaguide.extension.ExtensionLoader; import github.javaguide.loadbalance.LoadBalance; import github.javaguide.registry.ServiceDiscovery; import github.javaguide.registry.zk.util.CuratorUtils; +import github.javaguide.remoting.dto.RpcRequest; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; @@ -27,14 +28,15 @@ public class ZkServiceDiscovery implements ServiceDiscovery { } @Override - public InetSocketAddress lookupService(String rpcServiceName) { + public InetSocketAddress lookupService(RpcRequest rpcRequest) { + String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName(); CuratorFramework zkClient = CuratorUtils.getZkClient(); List serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName); if (serviceUrlList == null || serviceUrlList.size() == 0) { throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName); } // load balancing - String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcServiceName); + String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest); log.info("Successfully found the service address:[{}]", targetServiceUrl); String[] socketAddressArray = targetServiceUrl.split(":"); String host = socketAddressArray[0]; diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyRpcClient.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyRpcClient.java index 3d789dd..e438b76 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyRpcClient.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyRpcClient.java @@ -98,9 +98,9 @@ public final class NettyRpcClient implements RpcRequestTransport { // build return value CompletableFuture> resultFuture = new CompletableFuture<>(); // build rpc service name by rpcRequest - String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName(); +// String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName(); // get server address - InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcServiceName); + InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest); // get server address related channel Channel channel = getChannel(inetSocketAddress); if (channel.isActive()) { diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcClient.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcClient.java index e6dd307..ebd4e3b 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcClient.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcClient.java @@ -35,7 +35,7 @@ public class SocketRpcClient implements RpcRequestTransport { // build rpc service name by rpcRequest String rpcServiceName = RpcServiceProperties.builder().serviceName(rpcRequest.getInterfaceName()) .group(rpcRequest.getGroup()).version(rpcRequest.getVersion()).build().toRpcServiceName(); - InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcServiceName); + InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest); try (Socket socket = new Socket()) { socket.connect(inetSocketAddress); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); diff --git a/rpc-framework-simple/src/test/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalanceTest.java b/rpc-framework-simple/src/test/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalanceTest.java index e420477..1c0f6b8 100644 --- a/rpc-framework-simple/src/test/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalanceTest.java +++ b/rpc-framework-simple/src/test/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalanceTest.java @@ -2,11 +2,13 @@ package github.javaguide.loadbalance.loadbalancer; import github.javaguide.extension.ExtensionLoader; import github.javaguide.loadbalance.LoadBalance; +import github.javaguide.remoting.dto.RpcRequest; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.UUID; import static org.junit.Assert.assertEquals; @@ -17,10 +19,29 @@ class ConsistentHashLoadBalanceTest { LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("loadBalance"); List serviceUrlList = new ArrayList<>(Arrays.asList("127.0.0.1:9997", "127.0.0.1:9998", "127.0.0.1:9999")); String userRpcServiceName = "github.javaguide.UserServicetest1version1"; - String userServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, userRpcServiceName); + //build rpcCall + RpcRequest rpcRequest = RpcRequest.builder() +// .parameters(args) + .interfaceName(userRpcServiceName) +// .paramTypes(method.getParameterTypes()) + .requestId(UUID.randomUUID().toString()) + .group("test2") + .version("version2") + .build(); + String userServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest); assertEquals("127.0.0.1:9999",userServiceAddress); + + String schoolRpcServiceName = "github.javaguide.SchoolServicetest1version1"; - String schoolServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, schoolRpcServiceName); + rpcRequest = RpcRequest.builder() +// .parameters(args) + .interfaceName(userRpcServiceName) +// .paramTypes(method.getParameterTypes()) + .requestId(UUID.randomUUID().toString()) + .group("test2") + .version("version2") + .build(); + String schoolServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest); assertEquals("127.0.0.1:9997",schoolServiceAddress); } } \ No newline at end of file diff --git a/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java b/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java index a7507df..7274a5f 100644 --- a/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java +++ b/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java @@ -2,9 +2,11 @@ package github.javaguide.registry; import github.javaguide.registry.zk.ZkServiceDiscovery; import github.javaguide.registry.zk.ZkServiceRegistry; +import github.javaguide.remoting.dto.RpcRequest; import org.junit.jupiter.api.Test; import java.net.InetSocketAddress; +import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -21,7 +23,15 @@ class ZkServiceRegistryTest { InetSocketAddress givenInetSocketAddress = new InetSocketAddress("127.0.0.1", 9333); zkServiceRegistry.registerService("github.javaguide.registry.zk.ZkServiceRegistry", givenInetSocketAddress); ServiceDiscovery zkServiceDiscovery = new ZkServiceDiscovery(); - InetSocketAddress acquiredInetSocketAddress = zkServiceDiscovery.lookupService("github.javaguide.registry.zk.ZkServiceRegistry"); + RpcRequest rpcRequest = RpcRequest.builder() +// .parameters(args) + .interfaceName("github.javaguide.registry.zk.ZkServiceRegistry") +// .paramTypes(method.getParameterTypes()) + .requestId(UUID.randomUUID().toString()) + .group("test2") + .version("version2") + .build(); + InetSocketAddress acquiredInetSocketAddress = zkServiceDiscovery.lookupService(rpcRequest); assertEquals(givenInetSocketAddress.toString(), acquiredInetSocketAddress.toString()); } } -- GitLab