提交 467b2d8d 编写于 作者: S shuang.kou

[v3.0]update package structure and zookeeper utils

上级 83332b42
# guide-rpc-framework
guide-rpc-framework 是一款基于 Netty+Kyro+Zookeeper 实现的 RPC 框架。代码注释详细,结构清晰,非常适合阅读和学习。
guide-rpc-framework 是一款基于 Netty+Kyro+Zookeeper 实现的 RPC 框架。代码注释详细,结构清晰,并且集成了 Check Style 规范代码结构,非常适合阅读和学习。
由于Guide哥自身精力和能力有限,如果大家觉得有需要改进和完善的地方的话,欢迎将本项目 clone 到自己本地,在本地修改后提交 PR 给我,我会在第一时间 Review 你的代码。
......@@ -56,6 +56,10 @@ guide-rpc-framework 是一款基于 Netty+Kyro+Zookeeper 实现的 RPC 框架。
**1.导入项目**
克隆项目到自己的本地:`git clone git@github.com:Snailclimb/guide-rpc-framework.git`
然后使用 IDEA 打开,等待项目初始化完成。
**2.初始化git hooks**
> 以下演示的是 Mac/Linux对应的操作,Window 用户需要手动将 `config/git-hooks` 目录下的`pre-commit ` 文件拷贝到 项目下的 `.git/hooks/` 目录。
......@@ -95,12 +99,18 @@ exit $RESULT
**3.CheckStyle 插件下载和配置**
Settings->Plugins->搜索下载CheckStyle 插件,然后按照如下方式
IntelliJ IDEA-> Preferences->Plugins->搜索下载CheckStyle 插件,然后按照如下方式进行配置。
![](./images/setting-check-style.png)
配置完成之后,按照如下方式使用这个插件!
![](./images/run-check-style.png)
**4.下载运行zookeeper**
这里使用 Docker 来下载安装。
## 相关问题
### 为什么要造这个轮子?Dubbo不香么?
......
......@@ -16,7 +16,7 @@ import java.util.concurrent.TimeUnit;
* @author shuang.kou
* @createTime 2020年05月26日 16:00:00
*/
public final class ThreadPoolFactory {
public final class ThreadPoolFactoryUtils {
/**
* 线程池参数
*/
......@@ -25,7 +25,7 @@ public final class ThreadPoolFactory {
private static final int KEEP_ALIVE_TIME = 1;
private static final int BLOCKING_QUEUE_CAPACITY = 100;
private ThreadPoolFactory() {
private ThreadPoolFactoryUtils() {
}
......
......@@ -19,37 +19,31 @@ import java.util.concurrent.ConcurrentHashMap;
* @createTime 2020年05月31日 11:38:00
*/
@Slf4j
public final class CuratorHelper {
public final class CuratorUtils {
private static final int BASE_SLEEP_TIME = 1000;
private static final int MAX_RETRIES = 5;
private static final String CONNECT_STRING = "127.0.0.1:2181";
public static final String ZK_REGISTER_ROOT_PATH = "/my-rpc";
private static Map<String, List<String>> serviceAddressMap = new ConcurrentHashMap<>();
private static CuratorFramework zkClient = getZkClient();
private static CuratorFramework zkClient;
private CuratorHelper() {
static {
zkClient = getZkClient();
}
public static CuratorFramework getZkClient() {
// 重试策略。重试3次,并且会增加重试之间的睡眠时间。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
//要连接的服务器(可以是服务器列表)
.connectString(CONNECT_STRING)
.retryPolicy(retryPolicy)
.build();
curatorFramework.start();
return curatorFramework;
private CuratorUtils() {
}
/**
* 创建临时节点
* 临时节点驻存在ZooKeeper中,当连接和session断掉时被删除。
* 创建持久化节点。不同于临时节点,持久化节点不会因为客户端断开连接而被删除
*
* @param path 节点路径
*/
public static void createEphemeralNode(String path) {
public static void createPersistentNode(String path) {
try {
if (zkClient.checkExists().forPath(path) == null) {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
//eg: /my-rpc/github.javaguide.HelloService/127.0.0.1:9999
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
log.info("节点创建成功,节点为:[{}]", path);
} else {
log.info("节点已经存在,节点为:[{}]", path);
......@@ -61,13 +55,16 @@ public final class CuratorHelper {
/**
* 获取某个字节下的子节点,也就是获取所有提供服务的生产者的地址
*
* @param serviceName 服务对象接口名 eg:github.javaguide.HelloService
* @return 指定字节下的所有子节点
*/
public static List<String> getChildrenNodes(String serviceName) {
if (serviceAddressMap.containsKey(serviceName)) {
return serviceAddressMap.get(serviceName);
}
List<String> result;
String servicePath = CuratorHelper.ZK_REGISTER_ROOT_PATH + "/" + serviceName;
String servicePath = ZK_REGISTER_ROOT_PATH + "/" + serviceName;
try {
result = zkClient.getChildren().forPath(servicePath);
serviceAddressMap.put(serviceName, result);
......@@ -78,13 +75,25 @@ public final class CuratorHelper {
return result;
}
private static CuratorFramework getZkClient() {
// 重试策略。重试3次,并且会增加重试之间的睡眠时间。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
//要连接的服务器(可以是服务器列表)
.connectString(CONNECT_STRING)
.retryPolicy(retryPolicy)
.build();
curatorFramework.start();
return curatorFramework;
}
/**
* 注册监听
* 注册监听指定节点。
*
* @param serviceName 服务名称
* @param serviceName 服务对象接口名 eg:github.javaguide.HelloService
*/
private static void registerWatcher(CuratorFramework zkClient, String serviceName) {
String servicePath = CuratorHelper.ZK_REGISTER_ROOT_PATH + "/" + serviceName;
String servicePath = ZK_REGISTER_ROOT_PATH + "/" + serviceName;
PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true);
PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> {
List<String> serviceAddresses = curatorFramework.getChildren().forPath(servicePath);
......@@ -94,7 +103,8 @@ public final class CuratorHelper {
try {
pathChildrenCache.start();
} catch (Exception e) {
log.error("occur exception:", e);
throw new RpcException(e.getMessage(), e.getCause());
}
}
}
package github.javaguide.registry;
import github.javaguide.utils.zk.CuratorHelper;
import github.javaguide.utils.zk.CuratorUtils;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
......@@ -17,9 +17,12 @@ public class ZkServiceDiscovery implements ServiceDiscovery {
@Override
public InetSocketAddress lookupService(String serviceName) {
// TODO(shuang.kou):feat: 负载均衡
// 这里直接去了第一个找到的服务地址
String serviceAddress = CuratorHelper.getChildrenNodes(serviceName).get(0);
// 这里直接去了第一个找到的服务地址,eg:127.0.0.1:99990000000017
String serviceAddress = CuratorUtils.getChildrenNodes(serviceName).get(0);
log.info("成功找到服务地址:{}", serviceAddress);
return new InetSocketAddress(serviceAddress.split(":")[0], Integer.parseInt(serviceAddress.split(":")[1]));
String[] socketAddressArray = serviceAddress.split(":");
String host = socketAddressArray[0];
int port = Integer.parseInt(socketAddressArray[1]);
return new InetSocketAddress(host, port);
}
}
package github.javaguide.registry;
import github.javaguide.utils.zk.CuratorHelper;
import github.javaguide.utils.zk.CuratorUtils;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
......@@ -17,9 +17,9 @@ public class ZkServiceRegistry implements ServiceRegistry {
@Override
public void registerService(String serviceName, InetSocketAddress inetSocketAddress) {
//根节点下注册子节点:服务
StringBuilder servicePath = new StringBuilder(CuratorHelper.ZK_REGISTER_ROOT_PATH).append("/").append(serviceName);
StringBuilder servicePath = new StringBuilder(CuratorUtils.ZK_REGISTER_ROOT_PATH).append("/").append(serviceName);
//服务子节点下注册子节点:服务地址
servicePath.append(inetSocketAddress.toString());
CuratorHelper.createEphemeralNode(servicePath.toString());
CuratorUtils.createPersistentNode(servicePath.toString());
}
}
......@@ -3,8 +3,8 @@ package github.javaguide.remoting.transport.netty.server;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.handler.RpcRequestHandler;
import github.javaguide.utils.concurrent.ThreadPoolFactory;
import github.javaguide.utils.factory.SingletonFactory;
import github.javaguide.utils.concurrent.ThreadPoolFactoryUtils;
import github.javaguide.factory.SingletonFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
......@@ -33,7 +33,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public NettyServerHandler() {
this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class);
this.threadPool = ThreadPoolFactory.createDefaultThreadPool(THREAD_NAME_PREFIX);
this.threadPool = ThreadPoolFactoryUtils.createDefaultThreadPool(THREAD_NAME_PREFIX);
}
@Override
......
......@@ -3,7 +3,7 @@ package github.javaguide.remoting.transport.socket;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.handler.RpcRequestHandler;
import github.javaguide.utils.factory.SingletonFactory;
import github.javaguide.factory.SingletonFactory;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
......
......@@ -4,7 +4,7 @@ import github.javaguide.provider.ServiceProvider;
import github.javaguide.provider.ServiceProviderImpl;
import github.javaguide.registry.ServiceRegistry;
import github.javaguide.registry.ZkServiceRegistry;
import github.javaguide.utils.concurrent.ThreadPoolFactory;
import github.javaguide.utils.concurrent.ThreadPoolFactoryUtils;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
......@@ -30,7 +30,7 @@ public class SocketRpcServer {
public SocketRpcServer(String host, int port) {
this.host = host;
this.port = port;
threadPool = ThreadPoolFactory.createDefaultThreadPool("socket-server-rpc-pool");
threadPool = ThreadPoolFactoryUtils.createDefaultThreadPool("socket-server-rpc-pool");
serviceRegistry = new ZkServiceRegistry();
serviceProvider = new ServiceProviderImpl();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册