package github.javaguide.utils.zk; import github.javaguide.exception.RpcException; import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @author shuang.kou * @createTime 2020年05月31日 11:38:00 */ @Slf4j public final class CuratorUtils { private static final int BASE_SLEEP_TIME = 1000; private static final int MAX_RETRIES = 5; private static final String CONNECT_STRING = "127.0.0.1:2181"; public static final String ZK_REGISTER_ROOT_PATH = "/my-rpc"; private static Map> serviceAddressMap = new ConcurrentHashMap<>(); private static CuratorFramework zkClient; static { zkClient = getZkClient(); } private CuratorUtils() { } /** * 创建持久化节点。不同于临时节点,持久化节点不会因为客户端断开连接而被删除 * * @param path 节点路径 */ public static void createPersistentNode(String path) { try { if (zkClient.checkExists().forPath(path) == null) { //eg: /my-rpc/github.javaguide.HelloService/127.0.0.1:9999 zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path); log.info("节点创建成功,节点为:[{}]", path); } else { log.info("节点已经存在,节点为:[{}]", path); } } catch (Exception e) { throw new RpcException(e.getMessage(), e.getCause()); } } /** * 获取某个字节下的子节点,也就是获取所有提供服务的生产者的地址 * * @param serviceName 服务对象接口名 eg:github.javaguide.HelloService * @return 指定字节下的所有子节点 */ public static List getChildrenNodes(String serviceName) { if (serviceAddressMap.containsKey(serviceName)) { return serviceAddressMap.get(serviceName); } List result; String servicePath = ZK_REGISTER_ROOT_PATH + "/" + serviceName; try { result = zkClient.getChildren().forPath(servicePath); serviceAddressMap.put(serviceName, result); registerWatcher(zkClient, serviceName); } catch (Exception e) { throw new RpcException(e.getMessage(), e.getCause()); } return result; } private static CuratorFramework getZkClient() { // 重试策略。重试3次,并且会增加重试之间的睡眠时间。 RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES); CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() //要连接的服务器(可以是服务器列表) .connectString(CONNECT_STRING) .retryPolicy(retryPolicy) .build(); curatorFramework.start(); return curatorFramework; } /** * 注册监听指定节点。 * * @param serviceName 服务对象接口名 eg:github.javaguide.HelloService */ private static void registerWatcher(CuratorFramework zkClient, String serviceName) { String servicePath = ZK_REGISTER_ROOT_PATH + "/" + serviceName; PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true); PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> { List serviceAddresses = curatorFramework.getChildren().forPath(servicePath); serviceAddressMap.put(serviceName, serviceAddresses); }; pathChildrenCache.getListenable().addListener(pathChildrenCacheListener); try { pathChildrenCache.start(); } catch (Exception e) { throw new RpcException(e.getMessage(), e.getCause()); } } }