From 467b2d8d32c9782126fd55d3b15c2f818f87219f Mon Sep 17 00:00:00 2001 From: "shuang.kou" Date: Thu, 4 Jun 2020 10:30:58 +0800 Subject: [PATCH] [v3.0]update package structure and zookeeper utils --- README.md | 14 ++++- .../{utils => }/factory/SingletonFactory.java | 2 +- ...ctory.java => ThreadPoolFactoryUtils.java} | 4 +- .../{CuratorHelper.java => CuratorUtils.java} | 54 +++++++++++-------- .../registry/ZkServiceDiscovery.java | 11 ++-- .../javaguide/registry/ZkServiceRegistry.java | 6 +-- .../netty/server/NettyServerHandler.java | 6 +-- .../SocketRpcRequestHandlerRunnable.java | 2 +- .../transport/socket/SocketRpcServer.java | 4 +- 9 files changed, 63 insertions(+), 40 deletions(-) rename rpc-framework-common/src/main/java/github/javaguide/{utils => }/factory/SingletonFactory.java (95%) rename rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/{ThreadPoolFactory.java => ThreadPoolFactoryUtils.java} (96%) rename rpc-framework-common/src/main/java/github/javaguide/utils/zk/{CuratorHelper.java => CuratorUtils.java} (76%) diff --git a/README.md b/README.md index afacc6a..4c4d445 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # guide-rpc-framework -guide-rpc-framework 是一款基于 Netty+Kyro+Zookeeper 实现的 RPC 框架。代码注释详细,结构清晰,非常适合阅读和学习。 +guide-rpc-framework 是一款基于 Netty+Kyro+Zookeeper 实现的 RPC 框架。代码注释详细,结构清晰,并且集成了 Check Style 规范代码结构,非常适合阅读和学习。 由于Guide哥自身精力和能力有限,如果大家觉得有需要改进和完善的地方的话,欢迎将本项目 clone 到自己本地,在本地修改后提交 PR 给我,我会在第一时间 Review 你的代码。 @@ -56,6 +56,10 @@ guide-rpc-framework 是一款基于 Netty+Kyro+Zookeeper 实现的 RPC 框架。 **1.导入项目** +克隆项目到自己的本地:`git clone git@github.com:Snailclimb/guide-rpc-framework.git` + +然后使用 IDEA 打开,等待项目初始化完成。 + **2.初始化git hooks** > 以下演示的是 Mac/Linux对应的操作,Window 用户需要手动将 `config/git-hooks` 目录下的`pre-commit ` 文件拷贝到 项目下的 `.git/hooks/` 目录。 @@ -95,12 +99,18 @@ exit $RESULT **3.CheckStyle 插件下载和配置** -Settings->Plugins->搜索下载CheckStyle 插件,然后按照如下方式 +IntelliJ IDEA-> Preferences->Plugins->搜索下载CheckStyle 插件,然后按照如下方式进行配置。 ![](./images/setting-check-style.png) +配置完成之后,按照如下方式使用这个插件! + ![](./images/run-check-style.png) +**4.下载运行zookeeper** + +这里使用 Docker 来下载安装。 + ## 相关问题 ### 为什么要造这个轮子?Dubbo不香么? diff --git a/rpc-framework-common/src/main/java/github/javaguide/utils/factory/SingletonFactory.java b/rpc-framework-common/src/main/java/github/javaguide/factory/SingletonFactory.java similarity index 95% rename from rpc-framework-common/src/main/java/github/javaguide/utils/factory/SingletonFactory.java rename to rpc-framework-common/src/main/java/github/javaguide/factory/SingletonFactory.java index f234825..664be03 100644 --- a/rpc-framework-common/src/main/java/github/javaguide/utils/factory/SingletonFactory.java +++ b/rpc-framework-common/src/main/java/github/javaguide/factory/SingletonFactory.java @@ -1,4 +1,4 @@ -package github.javaguide.utils.factory; +package github.javaguide.factory; import java.util.HashMap; import java.util.Map; diff --git a/rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/ThreadPoolFactory.java b/rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/ThreadPoolFactoryUtils.java similarity index 96% rename from rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/ThreadPoolFactory.java rename to rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/ThreadPoolFactoryUtils.java index c338003..33ee4ca 100644 --- a/rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/ThreadPoolFactory.java +++ b/rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/ThreadPoolFactoryUtils.java @@ -16,7 +16,7 @@ import java.util.concurrent.TimeUnit; * @author shuang.kou * @createTime 2020年05月26日 16:00:00 */ -public final class ThreadPoolFactory { +public final class ThreadPoolFactoryUtils { /** * 线程池参数 */ @@ -25,7 +25,7 @@ public final class ThreadPoolFactory { private static final int KEEP_ALIVE_TIME = 1; private static final int BLOCKING_QUEUE_CAPACITY = 100; - private ThreadPoolFactory() { + private ThreadPoolFactoryUtils() { } diff --git a/rpc-framework-common/src/main/java/github/javaguide/utils/zk/CuratorHelper.java b/rpc-framework-common/src/main/java/github/javaguide/utils/zk/CuratorUtils.java similarity index 76% rename from rpc-framework-common/src/main/java/github/javaguide/utils/zk/CuratorHelper.java rename to rpc-framework-common/src/main/java/github/javaguide/utils/zk/CuratorUtils.java index a879a32..cd31372 100644 --- a/rpc-framework-common/src/main/java/github/javaguide/utils/zk/CuratorHelper.java +++ b/rpc-framework-common/src/main/java/github/javaguide/utils/zk/CuratorUtils.java @@ -19,37 +19,31 @@ import java.util.concurrent.ConcurrentHashMap; * @createTime 2020年05月31日 11:38:00 */ @Slf4j -public final class CuratorHelper { +public final class CuratorUtils { private static final int BASE_SLEEP_TIME = 1000; private static final int MAX_RETRIES = 5; private static final String CONNECT_STRING = "127.0.0.1:2181"; public static final String ZK_REGISTER_ROOT_PATH = "/my-rpc"; private static Map> serviceAddressMap = new ConcurrentHashMap<>(); - private static CuratorFramework zkClient = getZkClient(); + private static CuratorFramework zkClient; - private CuratorHelper() { + static { + zkClient = getZkClient(); } - public static CuratorFramework getZkClient() { - // 重试策略。重试3次,并且会增加重试之间的睡眠时间。 - RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES); - CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() - //要连接的服务器(可以是服务器列表) - .connectString(CONNECT_STRING) - .retryPolicy(retryPolicy) - .build(); - curatorFramework.start(); - return curatorFramework; + private CuratorUtils() { } /** - * 创建临时节点 - * 临时节点驻存在ZooKeeper中,当连接和session断掉时被删除。 + * 创建持久化节点。不同于临时节点,持久化节点不会因为客户端断开连接而被删除 + * + * @param path 节点路径 */ - public static void createEphemeralNode(String path) { + public static void createPersistentNode(String path) { try { if (zkClient.checkExists().forPath(path) == null) { - zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path); + //eg: /my-rpc/github.javaguide.HelloService/127.0.0.1:9999 + zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path); log.info("节点创建成功,节点为:[{}]", path); } else { log.info("节点已经存在,节点为:[{}]", path); @@ -61,13 +55,16 @@ public final class CuratorHelper { /** * 获取某个字节下的子节点,也就是获取所有提供服务的生产者的地址 + * + * @param serviceName 服务对象接口名 eg:github.javaguide.HelloService + * @return 指定字节下的所有子节点 */ public static List getChildrenNodes(String serviceName) { if (serviceAddressMap.containsKey(serviceName)) { return serviceAddressMap.get(serviceName); } List result; - String servicePath = CuratorHelper.ZK_REGISTER_ROOT_PATH + "/" + serviceName; + String servicePath = ZK_REGISTER_ROOT_PATH + "/" + serviceName; try { result = zkClient.getChildren().forPath(servicePath); serviceAddressMap.put(serviceName, result); @@ -78,13 +75,25 @@ public final class CuratorHelper { return result; } + private static CuratorFramework getZkClient() { + // 重试策略。重试3次,并且会增加重试之间的睡眠时间。 + RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES); + CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() + //要连接的服务器(可以是服务器列表) + .connectString(CONNECT_STRING) + .retryPolicy(retryPolicy) + .build(); + curatorFramework.start(); + return curatorFramework; + } + /** - * 注册监听 + * 注册监听指定节点。 * - * @param serviceName 服务名称 + * @param serviceName 服务对象接口名 eg:github.javaguide.HelloService */ private static void registerWatcher(CuratorFramework zkClient, String serviceName) { - String servicePath = CuratorHelper.ZK_REGISTER_ROOT_PATH + "/" + serviceName; + String servicePath = ZK_REGISTER_ROOT_PATH + "/" + serviceName; PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true); PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> { List serviceAddresses = curatorFramework.getChildren().forPath(servicePath); @@ -94,7 +103,8 @@ public final class CuratorHelper { try { pathChildrenCache.start(); } catch (Exception e) { - log.error("occur exception:", e); + throw new RpcException(e.getMessage(), e.getCause()); } } + } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceDiscovery.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceDiscovery.java index 9de90f0..c65568a 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceDiscovery.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceDiscovery.java @@ -1,6 +1,6 @@ package github.javaguide.registry; -import github.javaguide.utils.zk.CuratorHelper; +import github.javaguide.utils.zk.CuratorUtils; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; @@ -17,9 +17,12 @@ public class ZkServiceDiscovery implements ServiceDiscovery { @Override public InetSocketAddress lookupService(String serviceName) { // TODO(shuang.kou):feat: 负载均衡 - // 这里直接去了第一个找到的服务地址 - String serviceAddress = CuratorHelper.getChildrenNodes(serviceName).get(0); + // 这里直接去了第一个找到的服务地址,eg:127.0.0.1:99990000000017 + String serviceAddress = CuratorUtils.getChildrenNodes(serviceName).get(0); log.info("成功找到服务地址:{}", serviceAddress); - return new InetSocketAddress(serviceAddress.split(":")[0], Integer.parseInt(serviceAddress.split(":")[1])); + String[] socketAddressArray = serviceAddress.split(":"); + String host = socketAddressArray[0]; + int port = Integer.parseInt(socketAddressArray[1]); + return new InetSocketAddress(host, port); } } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceRegistry.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceRegistry.java index b0170cf..33297de 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceRegistry.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceRegistry.java @@ -1,6 +1,6 @@ package github.javaguide.registry; -import github.javaguide.utils.zk.CuratorHelper; +import github.javaguide.utils.zk.CuratorUtils; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; @@ -17,9 +17,9 @@ public class ZkServiceRegistry implements ServiceRegistry { @Override public void registerService(String serviceName, InetSocketAddress inetSocketAddress) { //根节点下注册子节点:服务 - StringBuilder servicePath = new StringBuilder(CuratorHelper.ZK_REGISTER_ROOT_PATH).append("/").append(serviceName); + StringBuilder servicePath = new StringBuilder(CuratorUtils.ZK_REGISTER_ROOT_PATH).append("/").append(serviceName); //服务子节点下注册子节点:服务地址 servicePath.append(inetSocketAddress.toString()); - CuratorHelper.createEphemeralNode(servicePath.toString()); + CuratorUtils.createPersistentNode(servicePath.toString()); } } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServerHandler.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServerHandler.java index 5d95498..c176232 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServerHandler.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServerHandler.java @@ -3,8 +3,8 @@ package github.javaguide.remoting.transport.netty.server; import github.javaguide.remoting.dto.RpcRequest; import github.javaguide.remoting.dto.RpcResponse; import github.javaguide.handler.RpcRequestHandler; -import github.javaguide.utils.concurrent.ThreadPoolFactory; -import github.javaguide.utils.factory.SingletonFactory; +import github.javaguide.utils.concurrent.ThreadPoolFactoryUtils; +import github.javaguide.factory.SingletonFactory; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -33,7 +33,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { public NettyServerHandler() { this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class); - this.threadPool = ThreadPoolFactory.createDefaultThreadPool(THREAD_NAME_PREFIX); + this.threadPool = ThreadPoolFactoryUtils.createDefaultThreadPool(THREAD_NAME_PREFIX); } @Override diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcRequestHandlerRunnable.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcRequestHandlerRunnable.java index 935c0fd..e28253c 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcRequestHandlerRunnable.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcRequestHandlerRunnable.java @@ -3,7 +3,7 @@ package github.javaguide.remoting.transport.socket; import github.javaguide.remoting.dto.RpcRequest; import github.javaguide.remoting.dto.RpcResponse; import github.javaguide.handler.RpcRequestHandler; -import github.javaguide.utils.factory.SingletonFactory; +import github.javaguide.factory.SingletonFactory; import lombok.extern.slf4j.Slf4j; import java.io.IOException; diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcServer.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcServer.java index 7b1904d..d633574 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcServer.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcServer.java @@ -4,7 +4,7 @@ import github.javaguide.provider.ServiceProvider; import github.javaguide.provider.ServiceProviderImpl; import github.javaguide.registry.ServiceRegistry; import github.javaguide.registry.ZkServiceRegistry; -import github.javaguide.utils.concurrent.ThreadPoolFactory; +import github.javaguide.utils.concurrent.ThreadPoolFactoryUtils; import lombok.extern.slf4j.Slf4j; import java.io.IOException; @@ -30,7 +30,7 @@ public class SocketRpcServer { public SocketRpcServer(String host, int port) { this.host = host; this.port = port; - threadPool = ThreadPoolFactory.createDefaultThreadPool("socket-server-rpc-pool"); + threadPool = ThreadPoolFactoryUtils.createDefaultThreadPool("socket-server-rpc-pool"); serviceRegistry = new ZkServiceRegistry(); serviceProvider = new ServiceProviderImpl(); } -- GitLab