diff --git a/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/AbstractLoadBalance.java b/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/AbstractLoadBalance.java index 3df30e305dcc963218313344fe107f459cb932c3..29e805d2c2e9b154e8360ff321bac3e290dc083d 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/AbstractLoadBalance.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/AbstractLoadBalance.java @@ -1,5 +1,7 @@ package github.javaguide.loadbalance; +import github.javaguide.remoting.dto.RpcRequest; + import java.util.List; /** @@ -10,16 +12,16 @@ import java.util.List; */ public abstract class AbstractLoadBalance implements LoadBalance { @Override - public String selectServiceAddress(List serviceAddresses, String rpcServiceName) { + public String selectServiceAddress(List serviceAddresses, RpcRequest rpcRequest) { if (serviceAddresses == null || serviceAddresses.size() == 0) { return null; } if (serviceAddresses.size() == 1) { return serviceAddresses.get(0); } - return doSelect(serviceAddresses, rpcServiceName); + return doSelect(serviceAddresses, rpcRequest); } - protected abstract String doSelect(List serviceAddresses, String rpcServiceName); + protected abstract String doSelect(List serviceAddresses, RpcRequest rpcRequest); } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/LoadBalance.java b/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/LoadBalance.java index 6b8bc3e5025f4383d8d4de039c0aa8ec60cf70c8..4a6fa26675e2a1d6c39e0631ae38a2efbd5aebd7 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/LoadBalance.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/LoadBalance.java @@ -1,6 +1,7 @@ package github.javaguide.loadbalance; import github.javaguide.extension.SPI; +import github.javaguide.remoting.dto.RpcRequest; import java.util.List; @@ -18,5 +19,5 @@ public interface LoadBalance { * @param serviceAddresses Service address list * @return target service address */ - String selectServiceAddress(List serviceAddresses, String rpcServiceName); + String selectServiceAddress(List serviceAddresses, RpcRequest rpcRequest); } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalance.java b/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalance.java index ee6d59f5907286fe2abd4b6595f585562b4ca009..29be79effc3451dadcf80beb014c9a3210e825c1 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalance.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalance.java @@ -1,11 +1,13 @@ package github.javaguide.loadbalance.loadbalancer; import github.javaguide.loadbalance.AbstractLoadBalance; +import github.javaguide.remoting.dto.RpcRequest; import lombok.extern.slf4j.Slf4j; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -22,8 +24,10 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance { private final ConcurrentHashMap selectors = new ConcurrentHashMap<>(); @Override - protected String doSelect(List serviceAddresses, String rpcServiceName) { + protected String doSelect(List serviceAddresses, RpcRequest rpcRequest) { int identityHashCode = System.identityHashCode(serviceAddresses); + // build rpc service name by rpcRequest + String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName(); ConsistentHashSelector selector = selectors.get(rpcServiceName); @@ -33,7 +37,7 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance { selector = selectors.get(rpcServiceName); } - return selector.select(rpcServiceName); + return selector.select(rpcServiceName+ Arrays.stream(rpcRequest.getParameters())); } static class ConsistentHashSelector { @@ -73,8 +77,8 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance { return ((long) (digest[3 + idx * 4] & 255) << 24 | (long) (digest[2 + idx * 4] & 255) << 16 | (long) (digest[1 + idx * 4] & 255) << 8 | (long) (digest[idx * 4] & 255)) & 4294967295L; } - public String select(String rpcServiceName) { - byte[] digest = md5(rpcServiceName); + public String select(String rpcServiceKey) { + byte[] digest = md5(rpcServiceKey); return selectForKey(hash(digest, 0)); } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/RandomLoadBalance.java b/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/RandomLoadBalance.java index 0d8dd6d6bb810939aae03d8cb61c24121a90989c..3e18eca6e559ce9569101c9a71dae130bc0886b6 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/RandomLoadBalance.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/loadbalance/loadbalancer/RandomLoadBalance.java @@ -1,6 +1,7 @@ package github.javaguide.loadbalance.loadbalancer; import github.javaguide.loadbalance.AbstractLoadBalance; +import github.javaguide.remoting.dto.RpcRequest; import java.util.List; import java.util.Random; @@ -13,7 +14,7 @@ import java.util.Random; */ public class RandomLoadBalance extends AbstractLoadBalance { @Override - protected String doSelect(List serviceAddresses, String rpcServiceName) { + protected String doSelect(List serviceAddresses, RpcRequest rpcRequest) { Random random = new Random(); return serviceAddresses.get(random.nextInt(serviceAddresses.size())); } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java index b885af0121726628a891eab758189deedd8f7765..443fa3461fab5cacd301f2ffe44a87ffcb064f08 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java @@ -1,6 +1,7 @@ package github.javaguide.registry; import github.javaguide.extension.SPI; +import github.javaguide.remoting.dto.RpcRequest; import java.net.InetSocketAddress; @@ -15,8 +16,8 @@ public interface ServiceDiscovery { /** * lookup service by rpcServiceName * - * @param rpcServiceName rpc service name + * @param rpcRequest rpc service pojo * @return service address */ - InetSocketAddress lookupService(String rpcServiceName); + InetSocketAddress lookupService(RpcRequest rpcRequest); } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java index ab672023d51fff86221291f43a3542b33a844ab8..786cac89125478e9eaf23494455438e4bd16a3e3 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java @@ -6,6 +6,7 @@ import github.javaguide.extension.ExtensionLoader; import github.javaguide.loadbalance.LoadBalance; import github.javaguide.registry.ServiceDiscovery; import github.javaguide.registry.zk.util.CuratorUtils; +import github.javaguide.remoting.dto.RpcRequest; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; @@ -27,14 +28,15 @@ public class ZkServiceDiscovery implements ServiceDiscovery { } @Override - public InetSocketAddress lookupService(String rpcServiceName) { + public InetSocketAddress lookupService(RpcRequest rpcRequest) { + String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName(); CuratorFramework zkClient = CuratorUtils.getZkClient(); List serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName); if (serviceUrlList == null || serviceUrlList.size() == 0) { throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName); } // load balancing - String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcServiceName); + String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest); log.info("Successfully found the service address:[{}]", targetServiceUrl); String[] socketAddressArray = targetServiceUrl.split(":"); String host = socketAddressArray[0]; diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyRpcClient.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyRpcClient.java index 3d789dd65718e46fa0ead8dd220ec2d39016ea39..e438b762dbd7000a33d7a32d64e722db097cbc5c 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyRpcClient.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyRpcClient.java @@ -98,9 +98,9 @@ public final class NettyRpcClient implements RpcRequestTransport { // build return value CompletableFuture> resultFuture = new CompletableFuture<>(); // build rpc service name by rpcRequest - String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName(); +// String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName(); // get server address - InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcServiceName); + InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest); // get server address related channel Channel channel = getChannel(inetSocketAddress); if (channel.isActive()) { diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcClient.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcClient.java index e6dd3077f0a1b27686e3dc1ebceeb07181fcc72e..ebd4e3b2f5908564a31c8c82a03bc97e5b6e96d4 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcClient.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcClient.java @@ -35,7 +35,7 @@ public class SocketRpcClient implements RpcRequestTransport { // build rpc service name by rpcRequest String rpcServiceName = RpcServiceProperties.builder().serviceName(rpcRequest.getInterfaceName()) .group(rpcRequest.getGroup()).version(rpcRequest.getVersion()).build().toRpcServiceName(); - InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcServiceName); + InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest); try (Socket socket = new Socket()) { socket.connect(inetSocketAddress); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); diff --git a/rpc-framework-simple/src/test/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalanceTest.java b/rpc-framework-simple/src/test/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalanceTest.java index e420477f9fc5533dfa5136c901eedb120cc5d5a8..1c0f6b8012357115e945039b54f393af256b56c3 100644 --- a/rpc-framework-simple/src/test/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalanceTest.java +++ b/rpc-framework-simple/src/test/java/github/javaguide/loadbalance/loadbalancer/ConsistentHashLoadBalanceTest.java @@ -2,11 +2,13 @@ package github.javaguide.loadbalance.loadbalancer; import github.javaguide.extension.ExtensionLoader; import github.javaguide.loadbalance.LoadBalance; +import github.javaguide.remoting.dto.RpcRequest; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.UUID; import static org.junit.Assert.assertEquals; @@ -17,10 +19,29 @@ class ConsistentHashLoadBalanceTest { LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("loadBalance"); List serviceUrlList = new ArrayList<>(Arrays.asList("127.0.0.1:9997", "127.0.0.1:9998", "127.0.0.1:9999")); String userRpcServiceName = "github.javaguide.UserServicetest1version1"; - String userServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, userRpcServiceName); + //build rpcCall + RpcRequest rpcRequest = RpcRequest.builder() +// .parameters(args) + .interfaceName(userRpcServiceName) +// .paramTypes(method.getParameterTypes()) + .requestId(UUID.randomUUID().toString()) + .group("test2") + .version("version2") + .build(); + String userServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest); assertEquals("127.0.0.1:9999",userServiceAddress); + + String schoolRpcServiceName = "github.javaguide.SchoolServicetest1version1"; - String schoolServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, schoolRpcServiceName); + rpcRequest = RpcRequest.builder() +// .parameters(args) + .interfaceName(userRpcServiceName) +// .paramTypes(method.getParameterTypes()) + .requestId(UUID.randomUUID().toString()) + .group("test2") + .version("version2") + .build(); + String schoolServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest); assertEquals("127.0.0.1:9997",schoolServiceAddress); } } \ No newline at end of file diff --git a/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java b/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java index a7507df577264d5ab4aa1468a9ab6ab1e46b8c0f..7274a5f88013044d7d53f672b808c9fd60f64dce 100644 --- a/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java +++ b/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java @@ -2,9 +2,11 @@ package github.javaguide.registry; import github.javaguide.registry.zk.ZkServiceDiscovery; import github.javaguide.registry.zk.ZkServiceRegistry; +import github.javaguide.remoting.dto.RpcRequest; import org.junit.jupiter.api.Test; import java.net.InetSocketAddress; +import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -21,7 +23,15 @@ class ZkServiceRegistryTest { InetSocketAddress givenInetSocketAddress = new InetSocketAddress("127.0.0.1", 9333); zkServiceRegistry.registerService("github.javaguide.registry.zk.ZkServiceRegistry", givenInetSocketAddress); ServiceDiscovery zkServiceDiscovery = new ZkServiceDiscovery(); - InetSocketAddress acquiredInetSocketAddress = zkServiceDiscovery.lookupService("github.javaguide.registry.zk.ZkServiceRegistry"); + RpcRequest rpcRequest = RpcRequest.builder() +// .parameters(args) + .interfaceName("github.javaguide.registry.zk.ZkServiceRegistry") +// .paramTypes(method.getParameterTypes()) + .requestId(UUID.randomUUID().toString()) + .group("test2") + .version("version2") + .build(); + InetSocketAddress acquiredInetSocketAddress = zkServiceDiscovery.lookupService(rpcRequest); assertEquals(givenInetSocketAddress.toString(), acquiredInetSocketAddress.toString()); } }