From decac3e2d3b2ec1be6dfcc2b053f49b1579816f6 Mon Sep 17 00:00:00 2001 From: "shuang.kou" Date: Mon, 1 Jun 2020 15:33:28 +0800 Subject: [PATCH] [v3.0]refractor exact a interface : ServiceDiscovery --- .../github/javaguide/NettyClientMain.java | 1 + .../javaguide/registry/ServiceDiscovery.java | 19 +++++++++++ .../javaguide/registry/ServiceRegistry.java | 9 +---- .../registry/ZkServiceDiscovery.java | 33 +++++++++++++++++++ .../javaguide/registry/ZkServiceRegistry.java | 13 ++------ .../client/NettyClientClientTransport.java | 12 +++---- .../transport/socket/SocketRpcClient.java | 10 +++--- .../registry/ZkServiceRegistryTest.java | 5 +-- 8 files changed, 70 insertions(+), 32 deletions(-) create mode 100644 rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java create mode 100644 rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceDiscovery.java diff --git a/example-client/src/main/java/github/javaguide/NettyClientMain.java b/example-client/src/main/java/github/javaguide/NettyClientMain.java index ee05469..3143b71 100644 --- a/example-client/src/main/java/github/javaguide/NettyClientMain.java +++ b/example-client/src/main/java/github/javaguide/NettyClientMain.java @@ -17,5 +17,6 @@ public class NettyClientMain { //如需使用 assert 断言,需要在 VM options 添加参数:-ea assert "Hello description is 222".equals(hello); String hello2 = helloService.hello(new Hello("111", "222")); + System.out.println(hello2); } } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java new file mode 100644 index 0000000..c835f8e --- /dev/null +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java @@ -0,0 +1,19 @@ +package github.javaguide.registry; + +import java.net.InetSocketAddress; + +/** + * 服务发现接口 + * + * @author shuang.kou + * @createTime 2020年06月01日 15:16:00 + */ +public interface ServiceDiscovery { + /** + * 查找服务 + * + * @param serviceName 服务名称 + * @return 提供服务的地址 + */ + InetSocketAddress lookupService(String serviceName); +} diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceRegistry.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceRegistry.java index d97ea49..dc7a288 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceRegistry.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceRegistry.java @@ -3,7 +3,7 @@ package github.javaguide.registry; import java.net.InetSocketAddress; /** - * 服务注册中心接口 + * 服务注册接口 * * @author shuang.kou * @createTime 2020年05月13日 08:39:00 @@ -17,11 +17,4 @@ public interface ServiceRegistry { */ void registerService(String serviceName, InetSocketAddress inetSocketAddress); - /** - * 查找服务 - * - * @param serviceName 服务名称 - * @return 提供服务的地址 - */ - InetSocketAddress lookupService(String serviceName); } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceDiscovery.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceDiscovery.java new file mode 100644 index 0000000..7377e65 --- /dev/null +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceDiscovery.java @@ -0,0 +1,33 @@ +package github.javaguide.registry; + +import github.javaguide.utils.zk.CuratorHelper; +import org.apache.curator.framework.CuratorFramework; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; + +/** + * 基于 zookeeper 实现服务发现 + * + * @author shuang.kou + * @createTime 2020年06月01日 15:16:00 + */ +public class ZkServiceDiscovery implements ServiceDiscovery { + private static final Logger logger = LoggerFactory.getLogger(ZkServiceDiscovery.class); + private final CuratorFramework zkClient; + + public ZkServiceDiscovery() { + this.zkClient = CuratorHelper.getZkClient(); + zkClient.start(); + } + + @Override + public InetSocketAddress lookupService(String serviceName) { + // TODO 负载均衡 + // 这里直接去了第一个找到的服务地址 + String serviceAddress = CuratorHelper.getChildrenNodes(zkClient, serviceName).get(0); + logger.info("成功找到服务地址:{}", serviceAddress); + return new InetSocketAddress(serviceAddress.split(":")[0], Integer.parseInt(serviceAddress.split(":")[1])); + } +} diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceRegistry.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceRegistry.java index 84c2ecc..5e3edbe 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceRegistry.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceRegistry.java @@ -8,13 +8,13 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; /** - * 基于 zookeeper 实现服务注册中心 + * 基于 zookeeper 实现服务注册 * * @author shuang.kou * @createTime 2020年05月31日 10:56:00 */ public class ZkServiceRegistry implements ServiceRegistry { - private static final Logger logger = LoggerFactory.getLogger(CuratorHelper.class); + private static final Logger logger = LoggerFactory.getLogger(ZkServiceRegistry.class); private final CuratorFramework zkClient; public ZkServiceRegistry() { @@ -31,13 +31,4 @@ public class ZkServiceRegistry implements ServiceRegistry { CuratorHelper.createEphemeralNode(zkClient, servicePath.toString()); logger.info("节点创建成功,节点为:{}", servicePath); } - - @Override - public InetSocketAddress lookupService(String serviceName) { - // TODO 负载均衡 - // 这里直接去了第一个找到的服务地址 - String serviceAddress = CuratorHelper.getChildrenNodes(zkClient, serviceName).get(0); - logger.info("成功找到服务地址:{}", serviceAddress); - return new InetSocketAddress(serviceAddress.split(":")[0], Integer.parseInt(serviceAddress.split(":")[1])); - } } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/transport/netty/client/NettyClientClientTransport.java b/rpc-framework-simple/src/main/java/github/javaguide/transport/netty/client/NettyClientClientTransport.java index 53e6377..6d876b2 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/transport/netty/client/NettyClientClientTransport.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/transport/netty/client/NettyClientClientTransport.java @@ -2,8 +2,8 @@ package github.javaguide.transport.netty.client; import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcResponse; -import github.javaguide.registry.ServiceRegistry; -import github.javaguide.registry.ZkServiceRegistry; +import github.javaguide.registry.ServiceDiscovery; +import github.javaguide.registry.ZkServiceDiscovery; import github.javaguide.transport.ClientTransport; import github.javaguide.utils.checker.RpcMessageChecker; import io.netty.channel.Channel; @@ -16,24 +16,24 @@ import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicReference; /** - * 基于 Netty 传输 RpcRequest + * 基于 Netty 传输 RpcRequest。 * * @author shuang.kou * @createTime 2020年05月29日 11:34:00 */ public class NettyClientClientTransport implements ClientTransport { private static final Logger logger = LoggerFactory.getLogger(NettyClientClientTransport.class); - private ServiceRegistry serviceRegistry; + private final ServiceDiscovery serviceDiscovery; public NettyClientClientTransport() { - this.serviceRegistry = new ZkServiceRegistry(); + this.serviceDiscovery = new ZkServiceDiscovery(); } @Override public Object sendRpcRequest(RpcRequest rpcRequest) { AtomicReference result = new AtomicReference<>(null); try { - InetSocketAddress inetSocketAddress = serviceRegistry.lookupService(rpcRequest.getInterfaceName()); + InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest.getInterfaceName()); Channel channel = ChannelProvider.get(inetSocketAddress); if (channel.isActive()) { channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) future -> { diff --git a/rpc-framework-simple/src/main/java/github/javaguide/transport/socket/SocketRpcClient.java b/rpc-framework-simple/src/main/java/github/javaguide/transport/socket/SocketRpcClient.java index 7bf050d..18f22f7 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/transport/socket/SocketRpcClient.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/transport/socket/SocketRpcClient.java @@ -3,8 +3,8 @@ package github.javaguide.transport.socket; import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcResponse; import github.javaguide.exception.RpcException; -import github.javaguide.registry.ServiceRegistry; -import github.javaguide.registry.ZkServiceRegistry; +import github.javaguide.registry.ServiceDiscovery; +import github.javaguide.registry.ZkServiceDiscovery; import github.javaguide.transport.ClientTransport; import github.javaguide.utils.checker.RpcMessageChecker; import lombok.AllArgsConstructor; @@ -26,15 +26,15 @@ import java.net.Socket; @AllArgsConstructor public class SocketRpcClient implements ClientTransport { private static final Logger logger = LoggerFactory.getLogger(SocketRpcClient.class); - private final ServiceRegistry serviceRegistry; + private final ServiceDiscovery serviceDiscovery; public SocketRpcClient() { - this.serviceRegistry = new ZkServiceRegistry(); + this.serviceDiscovery = new ZkServiceDiscovery(); } @Override public Object sendRpcRequest(RpcRequest rpcRequest) { - InetSocketAddress inetSocketAddress = serviceRegistry.lookupService(rpcRequest.getInterfaceName()); + InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest.getInterfaceName()); try (Socket socket = new Socket()) { socket.connect(inetSocketAddress); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); diff --git a/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java b/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java index c7af864..08adaa8 100644 --- a/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java +++ b/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java @@ -15,10 +15,11 @@ class ZkServiceRegistryTest { @Test void should_register_service_successful_and_lookup_service_by_service_name() { - ZkServiceRegistry zkServiceRegistry = new ZkServiceRegistry(); + ServiceRegistry zkServiceRegistry = new ZkServiceRegistry(); InetSocketAddress givenInetSocketAddress = new InetSocketAddress("127.0.0.1", 9333); zkServiceRegistry.registerService("github.javaguide.registry.ZkServiceRegistry", givenInetSocketAddress); - InetSocketAddress acquiredInetSocketAddress = zkServiceRegistry.lookupService("github.javaguide.registry.ZkServiceRegistry"); + ServiceDiscovery zkServiceDiscovery = new ZkServiceDiscovery(); + InetSocketAddress acquiredInetSocketAddress = zkServiceDiscovery.lookupService("github.javaguide.registry.ZkServiceRegistry"); assertEquals(givenInetSocketAddress.toString(), acquiredInetSocketAddress.toString()); } } -- GitLab