未验证 提交 f5087886 编写于 作者: K Kirs 提交者: GitHub

[BUG-#5678][Registry]fix registry init node miss (#5686)

上级 93f1d4df
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.registry; package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
...@@ -134,18 +135,6 @@ public class MasterRegistryClient { ...@@ -134,18 +135,6 @@ public class MasterRegistryClient {
unRegistry(); unRegistry();
} }
/**
* init system node
*/
private void initMasterSystemNode() {
try {
registryClient.persist(Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, "");
logger.info("initialize master server nodes success.");
} catch (Exception e) {
logger.error("init system node failed", e);
}
}
/** /**
* remove zookeeper node path * remove zookeeper node path
* *
...@@ -346,7 +335,6 @@ public class MasterRegistryClient { ...@@ -346,7 +335,6 @@ public class MasterRegistryClient {
* registry * registry
*/ */
public void registry() { public void registry() {
initMasterSystemNode();
String address = NetUtils.getAddr(masterConfig.getListenPort()); String address = NetUtils.getAddr(masterConfig.getListenPort());
localNodePath = getMasterPath(); localNodePath = getMasterPath();
registryClient.persistEphemeral(localNodePath, ""); registryClient.persistEphemeral(localNodePath, "");
...@@ -395,7 +383,7 @@ public class MasterRegistryClient { ...@@ -395,7 +383,7 @@ public class MasterRegistryClient {
*/ */
public String getMasterPath() { public String getMasterPath() {
String address = getLocalAddress(); String address = getLocalAddress();
return registryClient.getMasterPath() + "/" + address; return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + address;
} }
/** /**
......
...@@ -17,6 +17,9 @@ ...@@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.server.master.registry; package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
...@@ -131,11 +134,11 @@ public class ServerNodeManager implements InitializingBean { ...@@ -131,11 +134,11 @@ public class ServerNodeManager implements InitializingBean {
/** /**
* init MasterNodeListener listener * init MasterNodeListener listener
*/ */
registryClient.subscribe(registryClient.getMasterPath(), new MasterDataListener()); registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener());
/** /**
* init WorkerNodeListener listener * init WorkerNodeListener listener
*/ */
registryClient.subscribe(registryClient.getWorkerPath(), new MasterDataListener()); registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new MasterDataListener());
} }
/** /**
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.worker.registry; package org.apache.dolphinscheduler.server.worker.registry;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.Constants.SLASH; import static org.apache.dolphinscheduler.common.Constants.SLASH;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
...@@ -130,7 +131,7 @@ public class WorkerRegistryClient { ...@@ -130,7 +131,7 @@ public class WorkerRegistryClient {
public Set<String> getWorkerZkPaths() { public Set<String> getWorkerZkPaths() {
Set<String> workerPaths = Sets.newHashSet(); Set<String> workerPaths = Sets.newHashSet();
String address = getLocalAddress(); String address = getLocalAddress();
String workerZkPathPrefix = registryClient.getWorkerPath(); String workerZkPathPrefix = REGISTRY_DOLPHINSCHEDULER_WORKERS;
for (String workGroup : this.workerGroups) { for (String workGroup : this.workerGroups) {
StringJoiner workerPathJoiner = new StringJoiner(SLASH); StringJoiner workerPathJoiner = new StringJoiner(SLASH);
......
...@@ -71,8 +71,6 @@ public class WorkerRegistryClientTest { ...@@ -71,8 +71,6 @@ public class WorkerRegistryClientTest {
@Before @Before
public void before() { public void before() {
given(registryClient.getWorkerPath()).willReturn("/nodes/worker");
given(workerConfig.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1")); given(workerConfig.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
//given(heartBeatExecutor.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1")); //given(heartBeatExecutor.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
//scheduleAtFixedRate //scheduleAtFixedRate
......
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
package org.apache.dolphinscheduler.service.registry; package org.apache.dolphinscheduler.service.registry;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils;
...@@ -57,16 +59,7 @@ public class RegistryCenter { ...@@ -57,16 +59,7 @@ public class RegistryCenter {
*/ */
protected static String NODES; protected static String NODES;
/**
* master path
*/
protected static String MASTER_PATH = "/nodes/master";
private RegistryPluginManager registryPluginManager; private RegistryPluginManager registryPluginManager;
/**
* worker path
*/
protected static String WORKER_PATH = "/nodes/worker";
protected static final String EMPTY = ""; protected static final String EMPTY = "";
...@@ -113,8 +106,9 @@ public class RegistryCenter { ...@@ -113,8 +106,9 @@ public class RegistryCenter {
* init nodes * init nodes
*/ */
private void initNodes() { private void initNodes() {
persist(MASTER_PATH, EMPTY); persist(REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY);
persist(WORKER_PATH, EMPTY); persist(REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY);
persist(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, EMPTY);
} }
/** /**
...@@ -205,15 +199,6 @@ public class RegistryCenter { ...@@ -205,15 +199,6 @@ public class RegistryCenter {
return stoppable; return stoppable;
} }
/**
* get master path
*
* @return master path
*/
public String getMasterPath() {
return MASTER_PATH;
}
/** /**
* whether master path * whether master path
* *
...@@ -221,16 +206,7 @@ public class RegistryCenter { ...@@ -221,16 +206,7 @@ public class RegistryCenter {
* @return result * @return result
*/ */
public boolean isMasterPath(String path) { public boolean isMasterPath(String path) {
return path != null && path.contains(MASTER_PATH); return path != null && path.contains(REGISTRY_DOLPHINSCHEDULER_MASTERS);
}
/**
* get worker path
*
* @return worker path
*/
public String getWorkerPath() {
return WORKER_PATH;
} }
/** /**
...@@ -240,7 +216,7 @@ public class RegistryCenter { ...@@ -240,7 +216,7 @@ public class RegistryCenter {
* @return worker group path * @return worker group path
*/ */
public String getWorkerGroupPath(String workerGroup) { public String getWorkerGroupPath(String workerGroup) {
return WORKER_PATH + "/" + workerGroup; return REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup;
} }
/** /**
...@@ -250,7 +226,7 @@ public class RegistryCenter { ...@@ -250,7 +226,7 @@ public class RegistryCenter {
* @return result * @return result
*/ */
public boolean isWorkerPath(String path) { public boolean isWorkerPath(String path) {
return path != null && path.contains(WORKER_PATH); return path != null && path.contains(REGISTRY_DOLPHINSCHEDULER_WORKERS);
} }
/** /**
......
...@@ -22,6 +22,8 @@ import static org.apache.dolphinscheduler.common.Constants.COLON; ...@@ -22,6 +22,8 @@ import static org.apache.dolphinscheduler.common.Constants.COLON;
import static org.apache.dolphinscheduler.common.Constants.DELETE_OP; import static org.apache.dolphinscheduler.common.Constants.DELETE_OP;
import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING; import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
import static org.apache.dolphinscheduler.common.Constants.MASTER_TYPE; import static org.apache.dolphinscheduler.common.Constants.MASTER_TYPE;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE; import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE;
...@@ -344,7 +346,7 @@ public class RegistryClient extends RegistryCenter { ...@@ -344,7 +346,7 @@ public class RegistryClient extends RegistryCenter {
* @return master nodes * @return master nodes
*/ */
public Set<String> getMasterNodesDirectly() { public Set<String> getMasterNodesDirectly() {
List<String> masters = getChildrenKeys(MASTER_PATH); List<String> masters = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS);
return new HashSet<>(masters); return new HashSet<>(masters);
} }
...@@ -354,7 +356,7 @@ public class RegistryClient extends RegistryCenter { ...@@ -354,7 +356,7 @@ public class RegistryClient extends RegistryCenter {
* @return master nodes * @return master nodes
*/ */
public Set<String> getWorkerNodesDirectly() { public Set<String> getWorkerNodesDirectly() {
List<String> workers = getChildrenKeys(WORKER_PATH); List<String> workers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
return new HashSet<>(workers); return new HashSet<>(workers);
} }
...@@ -364,7 +366,7 @@ public class RegistryClient extends RegistryCenter { ...@@ -364,7 +366,7 @@ public class RegistryClient extends RegistryCenter {
* @return worker group nodes * @return worker group nodes
*/ */
public Set<String> getWorkerGroupDirectly() { public Set<String> getWorkerGroupDirectly() {
List<String> workers = getChildrenKeys(getWorkerPath()); List<String> workers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
return new HashSet<>(workers); return new HashSet<>(workers);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册