diff --git a/example-client/src/main/java/github/javaguide/NettyClientMain.java b/example-client/src/main/java/github/javaguide/NettyClientMain.java index ee05469654f8cee15d70fe66a9e789cf02671505..3143b71f337296d14f16138862907290978bbb45 100644 --- a/example-client/src/main/java/github/javaguide/NettyClientMain.java +++ b/example-client/src/main/java/github/javaguide/NettyClientMain.java @@ -17,5 +17,6 @@ public class NettyClientMain { //如需使用 assert 断言,需要在 VM options 添加参数:-ea assert "Hello description is 222".equals(hello); String hello2 = helloService.hello(new Hello("111", "222")); + System.out.println(hello2); } } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java new file mode 100644 index 0000000000000000000000000000000000000000..c835f8e015e9fc96256502be15f852b3864d7085 --- /dev/null +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceDiscovery.java @@ -0,0 +1,19 @@ +package github.javaguide.registry; + +import java.net.InetSocketAddress; + +/** + * 服务发现接口 + * + * @author shuang.kou + * @createTime 2020年06月01日 15:16:00 + */ +public interface ServiceDiscovery { + /** + * 查找服务 + * + * @param serviceName 服务名称 + * @return 提供服务的地址 + */ + InetSocketAddress lookupService(String serviceName); +} diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceRegistry.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceRegistry.java index d97ea493aba2c547b3595ffc3c75e14a2e1f4f8c..dc7a288afa360d6144cae89bd292d4ef3beb7c50 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceRegistry.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/ServiceRegistry.java @@ -3,7 +3,7 @@ package github.javaguide.registry; import java.net.InetSocketAddress; /** - * 服务注册中心接口 + * 服务注册接口 * * @author shuang.kou * @createTime 2020年05月13日 08:39:00 @@ -17,11 +17,4 @@ public interface ServiceRegistry { */ void registerService(String serviceName, InetSocketAddress inetSocketAddress); - /** - * 查找服务 - * - * @param serviceName 服务名称 - * @return 提供服务的地址 - */ - InetSocketAddress lookupService(String serviceName); } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceDiscovery.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceDiscovery.java new file mode 100644 index 0000000000000000000000000000000000000000..7377e65fec9d7b6d587dff280db5641b5d116840 --- /dev/null +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceDiscovery.java @@ -0,0 +1,33 @@ +package github.javaguide.registry; + +import github.javaguide.utils.zk.CuratorHelper; +import org.apache.curator.framework.CuratorFramework; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; + +/** + * 基于 zookeeper 实现服务发现 + * + * @author shuang.kou + * @createTime 2020年06月01日 15:16:00 + */ +public class ZkServiceDiscovery implements ServiceDiscovery { + private static final Logger logger = LoggerFactory.getLogger(ZkServiceDiscovery.class); + private final CuratorFramework zkClient; + + public ZkServiceDiscovery() { + this.zkClient = CuratorHelper.getZkClient(); + zkClient.start(); + } + + @Override + public InetSocketAddress lookupService(String serviceName) { + // TODO 负载均衡 + // 这里直接去了第一个找到的服务地址 + String serviceAddress = CuratorHelper.getChildrenNodes(zkClient, serviceName).get(0); + logger.info("成功找到服务地址:{}", serviceAddress); + return new InetSocketAddress(serviceAddress.split(":")[0], Integer.parseInt(serviceAddress.split(":")[1])); + } +} diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceRegistry.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceRegistry.java index 84c2ecc8fdb6ba470abfd1891003aec5e8670508..5e3edbeec922243a2bafa655b68894c5d553c869 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceRegistry.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceRegistry.java @@ -8,13 +8,13 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; /** - * 基于 zookeeper 实现服务注册中心 + * 基于 zookeeper 实现服务注册 * * @author shuang.kou * @createTime 2020年05月31日 10:56:00 */ public class ZkServiceRegistry implements ServiceRegistry { - private static final Logger logger = LoggerFactory.getLogger(CuratorHelper.class); + private static final Logger logger = LoggerFactory.getLogger(ZkServiceRegistry.class); private final CuratorFramework zkClient; public ZkServiceRegistry() { @@ -31,13 +31,4 @@ public class ZkServiceRegistry implements ServiceRegistry { CuratorHelper.createEphemeralNode(zkClient, servicePath.toString()); logger.info("节点创建成功,节点为:{}", servicePath); } - - @Override - public InetSocketAddress lookupService(String serviceName) { - // TODO 负载均衡 - // 这里直接去了第一个找到的服务地址 - String serviceAddress = CuratorHelper.getChildrenNodes(zkClient, serviceName).get(0); - logger.info("成功找到服务地址:{}", serviceAddress); - return new InetSocketAddress(serviceAddress.split(":")[0], Integer.parseInt(serviceAddress.split(":")[1])); - } } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/transport/netty/client/NettyClientClientTransport.java b/rpc-framework-simple/src/main/java/github/javaguide/transport/netty/client/NettyClientClientTransport.java index 53e637738595ef9fa85d9920ac90f380aa43ea9a..6d876b2500be2915363b92fd1e97ed5c59022c32 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/transport/netty/client/NettyClientClientTransport.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/transport/netty/client/NettyClientClientTransport.java @@ -2,8 +2,8 @@ package github.javaguide.transport.netty.client; import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcResponse; -import github.javaguide.registry.ServiceRegistry; -import github.javaguide.registry.ZkServiceRegistry; +import github.javaguide.registry.ServiceDiscovery; +import github.javaguide.registry.ZkServiceDiscovery; import github.javaguide.transport.ClientTransport; import github.javaguide.utils.checker.RpcMessageChecker; import io.netty.channel.Channel; @@ -16,24 +16,24 @@ import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicReference; /** - * 基于 Netty 传输 RpcRequest + * 基于 Netty 传输 RpcRequest。 * * @author shuang.kou * @createTime 2020年05月29日 11:34:00 */ public class NettyClientClientTransport implements ClientTransport { private static final Logger logger = LoggerFactory.getLogger(NettyClientClientTransport.class); - private ServiceRegistry serviceRegistry; + private final ServiceDiscovery serviceDiscovery; public NettyClientClientTransport() { - this.serviceRegistry = new ZkServiceRegistry(); + this.serviceDiscovery = new ZkServiceDiscovery(); } @Override public Object sendRpcRequest(RpcRequest rpcRequest) { AtomicReference result = new AtomicReference<>(null); try { - InetSocketAddress inetSocketAddress = serviceRegistry.lookupService(rpcRequest.getInterfaceName()); + InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest.getInterfaceName()); Channel channel = ChannelProvider.get(inetSocketAddress); if (channel.isActive()) { channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) future -> { diff --git a/rpc-framework-simple/src/main/java/github/javaguide/transport/socket/SocketRpcClient.java b/rpc-framework-simple/src/main/java/github/javaguide/transport/socket/SocketRpcClient.java index 7bf050da3f079e81b37aaa86ce8ab2c0e382e2cf..18f22f7658ce600fe7e1f2618a86aab74afc5265 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/transport/socket/SocketRpcClient.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/transport/socket/SocketRpcClient.java @@ -3,8 +3,8 @@ package github.javaguide.transport.socket; import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcResponse; import github.javaguide.exception.RpcException; -import github.javaguide.registry.ServiceRegistry; -import github.javaguide.registry.ZkServiceRegistry; +import github.javaguide.registry.ServiceDiscovery; +import github.javaguide.registry.ZkServiceDiscovery; import github.javaguide.transport.ClientTransport; import github.javaguide.utils.checker.RpcMessageChecker; import lombok.AllArgsConstructor; @@ -26,15 +26,15 @@ import java.net.Socket; @AllArgsConstructor public class SocketRpcClient implements ClientTransport { private static final Logger logger = LoggerFactory.getLogger(SocketRpcClient.class); - private final ServiceRegistry serviceRegistry; + private final ServiceDiscovery serviceDiscovery; public SocketRpcClient() { - this.serviceRegistry = new ZkServiceRegistry(); + this.serviceDiscovery = new ZkServiceDiscovery(); } @Override public Object sendRpcRequest(RpcRequest rpcRequest) { - InetSocketAddress inetSocketAddress = serviceRegistry.lookupService(rpcRequest.getInterfaceName()); + InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest.getInterfaceName()); try (Socket socket = new Socket()) { socket.connect(inetSocketAddress); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); diff --git a/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java b/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java index c7af864757e2bb3de60a1cc662e8b1e0b575c1fd..08adaa8fccdd3bb43942b8ea3e02060809d17437 100644 --- a/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java +++ b/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java @@ -15,10 +15,11 @@ class ZkServiceRegistryTest { @Test void should_register_service_successful_and_lookup_service_by_service_name() { - ZkServiceRegistry zkServiceRegistry = new ZkServiceRegistry(); + ServiceRegistry zkServiceRegistry = new ZkServiceRegistry(); InetSocketAddress givenInetSocketAddress = new InetSocketAddress("127.0.0.1", 9333); zkServiceRegistry.registerService("github.javaguide.registry.ZkServiceRegistry", givenInetSocketAddress); - InetSocketAddress acquiredInetSocketAddress = zkServiceRegistry.lookupService("github.javaguide.registry.ZkServiceRegistry"); + ServiceDiscovery zkServiceDiscovery = new ZkServiceDiscovery(); + InetSocketAddress acquiredInetSocketAddress = zkServiceDiscovery.lookupService("github.javaguide.registry.ZkServiceRegistry"); assertEquals(givenInetSocketAddress.toString(), acquiredInetSocketAddress.toString()); } }