未验证 提交 ab8f8786 编写于 作者: J JinYong Li 提交者: GitHub

fix heartBeatTaskCount bug (#12024)

上级 bc629f8f
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Master heart beat task
*/
public class MasterHeartBeatTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(MasterHeartBeatTask.class);
private final Set<String> heartBeatPaths;
private final RegistryClient registryClient;
private final HeartBeat heartBeat;
private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
public MasterHeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
Set<String> heartBeatPaths,
RegistryClient registryClient) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
}
public String getHeartBeatInfo() {
return this.heartBeat.encodeHeartBeat();
}
@Override
public void run() {
try {
for (String heartBeatPath : heartBeatPaths) {
if (registryClient.checkIsDeadServer(heartBeatPath, Constants.MASTER_TYPE)) {
registryClient.getStoppable().stop("i was judged to death, release resources and stop myself");
return;
}
}
for (String heartBeatPath : heartBeatPaths) {
registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
}
heartBeatErrorTimes.set(0);
} catch (Throwable ex) {
logger.error("HeartBeat task execute failed, errorTimes: {}", heartBeatErrorTimes.get(), ex);
}
}
}
...@@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.registry.api.RegistryException; ...@@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.service.FailoverService; import org.apache.dolphinscheduler.server.master.service.FailoverService;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -50,7 +49,7 @@ import com.google.common.collect.Sets; ...@@ -50,7 +49,7 @@ import com.google.common.collect.Sets;
/** /**
* <p>DolphinScheduler master register client, used to connect to registry and hand the registry events. * <p>DolphinScheduler master register client, used to connect to registry and hand the registry events.
* <p>When the Master node startup, it will register in registry center. And schedule a {@link HeartBeatTask} to update its metadata in registry. * <p>When the Master node startup, it will register in registry center. And schedule a {@link MasterHeartBeatTask} to update its metadata in registry.
*/ */
@Component @Component
public class MasterRegistryClient implements AutoCloseable { public class MasterRegistryClient implements AutoCloseable {
...@@ -97,7 +96,7 @@ public class MasterRegistryClient implements AutoCloseable { ...@@ -97,7 +96,7 @@ public class MasterRegistryClient implements AutoCloseable {
// master registry // master registry
registry(); registry();
registryClient.addConnectionStateListener(new MasterConnectionStateListener(getCurrentNodePath(), registryClient.addConnectionStateListener(new MasterConnectionStateListener(getCurrentNodePath(),
registryClient)); registryClient));
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener()); registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
} catch (Exception e) { } catch (Exception e) {
throw new RegistryException("Master registry client start up error", e); throw new RegistryException("Master registry client start up error", e);
...@@ -190,13 +189,11 @@ public class MasterRegistryClient implements AutoCloseable { ...@@ -190,13 +189,11 @@ public class MasterRegistryClient implements AutoCloseable {
logger.info("Master node : {} registering to registry center", masterAddress); logger.info("Master node : {} registering to registry center", masterAddress);
String localNodePath = getCurrentNodePath(); String localNodePath = getCurrentNodePath();
Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval(); Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, MasterHeartBeatTask heartBeatTask = new MasterHeartBeatTask(startupTime,
masterConfig.getMaxCpuLoadAvg(), masterConfig.getMaxCpuLoadAvg(),
masterConfig.getReservedMemory(), masterConfig.getReservedMemory(),
Sets.newHashSet(localNodePath), Sets.newHashSet(localNodePath),
Constants.MASTER_TYPE, registryClient);
registryClient,
masterConfig.getHeartbeatErrorThreshold());
// remove before persist // remove before persist
registryClient.remove(localNodePath); registryClient.remove(localNodePath);
...@@ -247,4 +244,4 @@ public class MasterRegistryClient implements AutoCloseable { ...@@ -247,4 +244,4 @@ public class MasterRegistryClient implements AutoCloseable {
return NetUtils.getAddr(masterConfig.getListenPort()); return NetUtils.getAddr(masterConfig.getListenPort());
} }
} }
\ No newline at end of file
...@@ -15,64 +15,43 @@ ...@@ -15,64 +15,43 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.registry; package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.HeartBeat; import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Heart beat task * Heart beat task
*/ */
public class HeartBeatTask implements Runnable { public class WorkerHeartBeatTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class); private final Logger logger = LoggerFactory.getLogger(WorkerHeartBeatTask.class);
private final Set<String> heartBeatPaths; private final Set<String> heartBeatPaths;
private final RegistryClient registryClient; private final RegistryClient registryClient;
private int workerWaitingTaskCount; private int workerWaitingTaskCount;
private final String serverType;
private final HeartBeat heartBeat; private final HeartBeat heartBeat;
private final int heartBeatErrorThreshold;
private final AtomicInteger heartBeatErrorTimes = new AtomicInteger(); private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
public HeartBeatTask(long startupTime, public WorkerHeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
Set<String> heartBeatPaths,
String serverType,
RegistryClient registryClient,
int heartBeatErrorThreshold) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.serverType = serverType;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
this.heartBeatErrorThreshold = heartBeatErrorThreshold;
}
public HeartBeatTask(long startupTime,
double maxCpuloadAvg, double maxCpuloadAvg,
double reservedMemory, double reservedMemory,
int hostWeight, int hostWeight,
Set<String> heartBeatPaths, Set<String> heartBeatPaths,
String serverType,
RegistryClient registryClient, RegistryClient registryClient,
int workerThreadCount, int workerThreadCount,
int workerWaitingTaskCount, int workerWaitingTaskCount) {
int heartBeatErrorThreshold) {
this.heartBeatPaths = heartBeatPaths; this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient; this.registryClient = registryClient;
this.workerWaitingTaskCount = workerWaitingTaskCount; this.workerWaitingTaskCount = workerWaitingTaskCount;
this.serverType = serverType;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount); this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount);
this.heartBeatErrorThreshold = heartBeatErrorThreshold;
} }
public String getHeartBeatInfo() { public String getHeartBeatInfo() {
...@@ -82,14 +61,12 @@ public class HeartBeatTask implements Runnable { ...@@ -82,14 +61,12 @@ public class HeartBeatTask implements Runnable {
@Override @Override
public void run() { public void run() {
try { try {
// check dead or not in zookeeper
for (String heartBeatPath : heartBeatPaths) { for (String heartBeatPath : heartBeatPaths) {
if (registryClient.checkIsDeadServer(heartBeatPath, serverType)) { if (registryClient.checkIsDeadServer(heartBeatPath, Constants.WORKER_TYPE)) {
registryClient.getStoppable().stop("i was judged to death, release resources and stop myself"); registryClient.getStoppable().stop("i was judged to death, release resources and stop myself");
return; return;
} }
} }
// update waiting task count // update waiting task count
heartBeat.setWorkerWaitingTaskCount(workerWaitingTaskCount); heartBeat.setWorkerWaitingTaskCount(workerWaitingTaskCount);
...@@ -98,11 +75,7 @@ public class HeartBeatTask implements Runnable { ...@@ -98,11 +75,7 @@ public class HeartBeatTask implements Runnable {
} }
heartBeatErrorTimes.set(0); heartBeatErrorTimes.set(0);
} catch (Throwable ex) { } catch (Throwable ex) {
logger.error("HeartBeat task execute failed", ex); logger.error("HeartBeat task execute failed, errorTimes: {}", heartBeatErrorTimes.get(), ex);
if (heartBeatErrorTimes.incrementAndGet() >= heartBeatErrorThreshold) {
registryClient.getStoppable()
.stop("HeartBeat task connect to zk failed too much times: " + heartBeatErrorTimes);
}
} }
} }
} }
...@@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.common.enums.NodeType; ...@@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.dolphinscheduler.service.registry.RegistryClient;
...@@ -101,16 +100,14 @@ public class WorkerRegistryClient implements AutoCloseable { ...@@ -101,16 +100,14 @@ public class WorkerRegistryClient implements AutoCloseable {
Set<String> workerZkPaths = getWorkerZkPaths(); Set<String> workerZkPaths = getWorkerZkPaths();
long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds(); long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, WorkerHeartBeatTask heartBeatTask = new WorkerHeartBeatTask(startupTime,
workerConfig.getMaxCpuLoadAvg(), workerConfig.getMaxCpuLoadAvg(),
workerConfig.getReservedMemory(), workerConfig.getReservedMemory(),
workerConfig.getHostWeight(), workerConfig.getHostWeight(),
workerZkPaths, workerZkPaths,
Constants.WORKER_TYPE, registryClient,
registryClient, workerConfig.getExecThreads(),
workerConfig.getExecThreads(), workerManagerThread.getThreadPoolQueueSize());
workerManagerThread.getThreadPoolQueueSize(),
workerConfig.getHeartbeatErrorThreshold());
for (String workerZKPath : workerZkPaths) { for (String workerZKPath : workerZkPaths) {
// remove before persist // remove before persist
...@@ -199,4 +196,4 @@ public class WorkerRegistryClient implements AutoCloseable { ...@@ -199,4 +196,4 @@ public class WorkerRegistryClient implements AutoCloseable {
unRegistry(); unRegistry();
} }
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册