提交 5e935d57 编写于 作者: T Tboy 提交者: bao liang

refactor AbstractZKClient (#1627)

* we should insert alert DB once , and trigger this type of alert 3 times

* refactor AbstractZKClient
上级 503be5f4
...@@ -16,6 +16,22 @@ ...@@ -16,6 +16,22 @@
*/ */
package org.apache.dolphinscheduler.common.zk; package org.apache.dolphinscheduler.common.zk;
import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.enums.ZKNodeType;
...@@ -23,26 +39,9 @@ import org.apache.dolphinscheduler.common.model.Server; ...@@ -23,26 +39,9 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ResInfo; import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.*;
import static org.apache.dolphinscheduler.common.Constants.*;
/** /**
* abstract zookeeper client * abstract zookeeper client
...@@ -70,8 +69,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ ...@@ -70,8 +69,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
return; return;
} }
byte[] bytes = zkClient.getData().forPath(znode); String resInfoStr = super.get(znode);
String resInfoStr = new String(bytes);
String[] splits = resInfoStr.split(Constants.COMMA); String[] splits = resInfoStr.split(Constants.COMMA);
if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){ if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
return; return;
...@@ -107,8 +105,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ ...@@ -107,8 +105,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX; String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX;
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo; String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
if(zkClient.checkExists().forPath(zNode) == null || if(!isExisted(zNode) || isExisted(deadServerPath)){
zkClient.checkExists().forPath(deadServerPath) != null ){
return true; return true;
} }
...@@ -118,14 +115,12 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ ...@@ -118,14 +115,12 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
public void removeDeadServerByHost(String host, String serverType) throws Exception { public void removeDeadServerByHost(String host, String serverType) throws Exception {
List<String> deadServers = zkClient.getChildren().forPath(getDeadZNodeParentPath()); List<String> deadServers = super.getChildrenKeys(getDeadZNodeParentPath());
for(String serverPath : deadServers){ for(String serverPath : deadServers){
if(serverPath.startsWith(serverType+UNDERLINE+host)){ if(serverPath.startsWith(serverType+UNDERLINE+host)){
String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath; String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
if(zkClient.checkExists().forPath(server) != null){ super.remove(server);
zkClient.delete().forPath(server); logger.info("{} server {} deleted from zk dead server path success" , serverType , host);
logger.info("{} server {} deleted from zk dead server path success" , serverType , host);
}
} }
} }
} }
...@@ -143,8 +138,8 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ ...@@ -143,8 +138,8 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
// create temporary sequence nodes for master znode // create temporary sequence nodes for master znode
String parentPath = getZNodeParentPath(zkNodeType); String parentPath = getZNodeParentPath(zkNodeType);
String serverPathPrefix = parentPath + "/" + OSUtils.getHost(); String serverPathPrefix = parentPath + "/" + OSUtils.getHost();
String registerPath = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath( String registerPath = serverPathPrefix + UNDERLINE;
serverPathPrefix + UNDERLINE, heartbeatZKInfo.getBytes()); super.persistEphemeral(registerPath, heartbeatZKInfo);
logger.info("register {} node {} success" , zkNodeType.toString(), registerPath); logger.info("register {} node {} success" , zkNodeType.toString(), registerPath);
return registerPath; return registerPath;
} }
...@@ -165,7 +160,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ ...@@ -165,7 +160,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
} }
registerPath = createZNodePath(zkNodeType); registerPath = createZNodePath(zkNodeType);
// handle dead server // handle dead server
handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP); handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP);
return registerPath; return registerPath;
...@@ -196,10 +191,10 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ ...@@ -196,10 +191,10 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
}else if(opType.equals(ADD_ZK_OP)){ }else if(opType.equals(ADD_ZK_OP)){
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo; String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
if(zkClient.checkExists().forPath(deadServerPath) == null){ if(!super.isExisted(deadServerPath)){
//add dead server info to zk dead server path : /dead-servers/ //add dead server info to zk dead server path : /dead-servers/
zkClient.create().forPath(deadServerPath,(type + UNDERLINE + ipSeqNo).getBytes()); super.persist(deadServerPath,(type + UNDERLINE + ipSeqNo));
logger.info("{} server dead , and {} added to zk dead server path success" , logger.info("{} server dead , and {} added to zk dead server path success" ,
zkNodeType.toString(), zNode); zkNodeType.toString(), zNode);
...@@ -226,19 +221,13 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ ...@@ -226,19 +221,13 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
List<String> childrenList = new ArrayList<>(); List<String> childrenList = new ArrayList<>();
try { try {
// read master node parent path from conf // read master node parent path from conf
if(zkClient.checkExists().forPath(getZNodeParentPath(ZKNodeType.MASTER)) != null){ if(super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))){
childrenList = zkClient.getChildren().forPath(getZNodeParentPath(ZKNodeType.MASTER)); childrenList = super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
} }
} catch (Exception e) { } catch (Exception e) {
if(e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){ logger.error("getActiveMasterNum error",e);
logger.error("zookeeper service not started",e);
}else{
logger.error(e.getMessage(),e);
}
}finally {
return childrenList.size();
} }
return childrenList.size();
} }
/** /**
...@@ -280,10 +269,9 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ ...@@ -280,10 +269,9 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
Map<String, String> masterMap = new HashMap<>(); Map<String, String> masterMap = new HashMap<>();
try { try {
String path = getZNodeParentPath(zkNodeType); String path = getZNodeParentPath(zkNodeType);
List<String> serverList = getZkClient().getChildren().forPath(path); List<String> serverList = super.getChildrenKeys(path);
for(String server : serverList){ for(String server : serverList){
byte[] bytes = getZkClient().getData().forPath(path + "/" + server); masterMap.putIfAbsent(server, super.get(path + "/" + server));
masterMap.putIfAbsent(server, new String(bytes));
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("get server list failed : " + e.getMessage(), e); logger.error("get server list failed : " + e.getMessage(), e);
...@@ -430,27 +418,15 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ ...@@ -430,27 +418,15 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
*/ */
protected void initSystemZNode(){ protected void initSystemZNode(){
try { try {
createNodePath(getMasterZNodeParentPath()); persist(getMasterZNodeParentPath(), "");
createNodePath(getWorkerZNodeParentPath()); persist(getWorkerZNodeParentPath(), "");
createNodePath(getDeadZNodeParentPath()); persist(getDeadZNodeParentPath(), "");
} catch (Exception e) { } catch (Exception e) {
logger.error("init system znode failed : " + e.getMessage(),e); logger.error("init system znode failed : " + e.getMessage(),e);
} }
} }
/**
* create zookeeper node path if not exists
* @param zNodeParentPath zookeeper parent path
* @throws Exception errors
*/
private void createNodePath(String zNodeParentPath) throws Exception {
if(null == zkClient.checkExists().forPath(zNodeParentPath)){
zkClient.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(zNodeParentPath);
}
}
/** /**
* server self dead, stop all threads * server self dead, stop all threads
* @param serverHost server host * @param serverHost server host
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册