From 5e935d579eec25701a6ce1d29fea95b161173ba3 Mon Sep 17 00:00:00 2001 From: Tboy Date: Mon, 30 Dec 2019 17:59:06 +0800 Subject: [PATCH] refactor AbstractZKClient (#1627) * we should insert alert DB once , and trigger this type of alert 3 times * refactor AbstractZKClient --- .../common/zk/AbstractZKClient.java | 96 +++++++------------ 1 file changed, 36 insertions(+), 60 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java index 0e95dddb3..c3ba71827 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java @@ -16,6 +16,22 @@ */ package org.apache.dolphinscheduler.common.zk; +import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP; +import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP; +import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX; +import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; +import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; +import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.ZKNodeType; @@ -23,26 +39,9 @@ import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.ResInfo; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.lang3.StringUtils; -import org.apache.curator.RetryPolicy; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.imps.CuratorFrameworkState; -import org.apache.curator.framework.recipes.locks.InterProcessMutex; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; - -import static org.apache.dolphinscheduler.common.Constants.*; - /** * abstract zookeeper client @@ -70,8 +69,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ return; } - byte[] bytes = zkClient.getData().forPath(znode); - String resInfoStr = new String(bytes); + String resInfoStr = super.get(znode); String[] splits = resInfoStr.split(Constants.COMMA); if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){ return; @@ -107,8 +105,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX; String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo; - if(zkClient.checkExists().forPath(zNode) == null || - zkClient.checkExists().forPath(deadServerPath) != null ){ + if(!isExisted(zNode) || isExisted(deadServerPath)){ return true; } @@ -118,14 +115,12 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ public void removeDeadServerByHost(String host, String serverType) throws Exception { - List deadServers = zkClient.getChildren().forPath(getDeadZNodeParentPath()); + List deadServers = super.getChildrenKeys(getDeadZNodeParentPath()); for(String serverPath : deadServers){ if(serverPath.startsWith(serverType+UNDERLINE+host)){ - String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath; - if(zkClient.checkExists().forPath(server) != null){ - zkClient.delete().forPath(server); - logger.info("{} server {} deleted from zk dead server path success" , serverType , host); - } + String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath; + super.remove(server); + logger.info("{} server {} deleted from zk dead server path success" , serverType , host); } } } @@ -143,8 +138,8 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ // create temporary sequence nodes for master znode String parentPath = getZNodeParentPath(zkNodeType); String serverPathPrefix = parentPath + "/" + OSUtils.getHost(); - String registerPath = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath( - serverPathPrefix + UNDERLINE, heartbeatZKInfo.getBytes()); + String registerPath = serverPathPrefix + UNDERLINE; + super.persistEphemeral(registerPath, heartbeatZKInfo); logger.info("register {} node {} success" , zkNodeType.toString(), registerPath); return registerPath; } @@ -165,7 +160,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ } registerPath = createZNodePath(zkNodeType); - // handle dead server + // handle dead server handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP); return registerPath; @@ -196,10 +191,10 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ }else if(opType.equals(ADD_ZK_OP)){ String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo; - if(zkClient.checkExists().forPath(deadServerPath) == null){ + if(!super.isExisted(deadServerPath)){ //add dead server info to zk dead server path : /dead-servers/ - zkClient.create().forPath(deadServerPath,(type + UNDERLINE + ipSeqNo).getBytes()); + super.persist(deadServerPath,(type + UNDERLINE + ipSeqNo)); logger.info("{} server dead , and {} added to zk dead server path success" , zkNodeType.toString(), zNode); @@ -226,19 +221,13 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ List childrenList = new ArrayList<>(); try { // read master node parent path from conf - if(zkClient.checkExists().forPath(getZNodeParentPath(ZKNodeType.MASTER)) != null){ - childrenList = zkClient.getChildren().forPath(getZNodeParentPath(ZKNodeType.MASTER)); + if(super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))){ + childrenList = super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER)); } } catch (Exception e) { - if(e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){ - logger.error("zookeeper service not started",e); - }else{ - logger.error(e.getMessage(),e); - } - - }finally { - return childrenList.size(); + logger.error("getActiveMasterNum error",e); } + return childrenList.size(); } /** @@ -280,10 +269,9 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ Map masterMap = new HashMap<>(); try { String path = getZNodeParentPath(zkNodeType); - List serverList = getZkClient().getChildren().forPath(path); + List serverList = super.getChildrenKeys(path); for(String server : serverList){ - byte[] bytes = getZkClient().getData().forPath(path + "/" + server); - masterMap.putIfAbsent(server, new String(bytes)); + masterMap.putIfAbsent(server, super.get(path + "/" + server)); } } catch (Exception e) { logger.error("get server list failed : " + e.getMessage(), e); @@ -430,27 +418,15 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ */ protected void initSystemZNode(){ try { - createNodePath(getMasterZNodeParentPath()); - createNodePath(getWorkerZNodeParentPath()); - createNodePath(getDeadZNodeParentPath()); + persist(getMasterZNodeParentPath(), ""); + persist(getWorkerZNodeParentPath(), ""); + persist(getDeadZNodeParentPath(), ""); } catch (Exception e) { logger.error("init system znode failed : " + e.getMessage(),e); } } - /** - * create zookeeper node path if not exists - * @param zNodeParentPath zookeeper parent path - * @throws Exception errors - */ - private void createNodePath(String zNodeParentPath) throws Exception { - if(null == zkClient.checkExists().forPath(zNodeParentPath)){ - zkClient.create().creatingParentContainersIfNeeded() - .withMode(CreateMode.PERSISTENT).forPath(zNodeParentPath); - } - } - /** * server self dead, stop all threads * @param serverHost server host -- GitLab