提交 c8906686 编写于 作者: L lidongdai

优化worker server获取任务的性能

上级 76323482
......@@ -3,10 +3,10 @@ alert.type=EMAIL
# mail server configuration
#xls file path,need create if not exist
......@@ -192,6 +192,11 @@ public final class Constants {
public static final String SEMICOLON = ";";
* DOT .
public static final String DOT = ".";
......@@ -822,6 +827,7 @@ public final class Constants {
* default worker group id
public static final int DEFAULT_WORKER_ID = -1;
......@@ -24,20 +24,17 @@ public interface ITaskQueue {
* take out all the elements
* this method has deprecated
* use checkTaskExists instead
* @param key
* @return
List<String> getAllTasks(String key);
* check task exists in the task queue or not
* @param key queue name
* @param task ${priority}_${processInstanceId}_${taskId}
* @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* @return true if exists in the queue
boolean checkTaskExists(String key, String task);
......@@ -54,10 +51,10 @@ public interface ITaskQueue {
* an element pops out of the queue
* @param key queue name
* @param remove whether remove the element
* @param n how many elements to poll
* @return
String poll(String key, boolean remove);
List<String> poll(String key, int n);
* remove a element from queue
......@@ -42,7 +42,7 @@ public class TaskQueueFactory {
public static ITaskQueue getTaskQueueInstance() {
String queueImplValue = CommonUtils.getQueueImplValue();
if (StringUtils.isNotBlank(queueImplValue)) {
// queueImplValue = StringUtils.trim(queueImplValue);
// queueImplValue = IpUtils.trim(queueImplValue);
// if (SCHEDULER_QUEUE_REDIS_IMPL.equals(queueImplValue)) {
// logger.info("task queue impl use reids ");
......@@ -19,6 +19,8 @@ package cn.escheduler.common.queue;
import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.Bytes;
import cn.escheduler.common.utils.IpUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
......@@ -26,10 +28,7 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
* A singleton of a task queue implemented with zookeeper
......@@ -62,7 +61,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* @param key task queue name
* @return
public List<String> getAllTasks(String key) {
try {
......@@ -80,7 +78,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* check task exists in the task queue or not
* @param key queue name
* @param task ${priority}_${processInstanceId}_${taskId}
* @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* @return true if exists in the queue
......@@ -110,7 +108,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* add task to tasks queue
* @param key task queue name
* @param value ${priority}_${processInstanceId}_${taskId}
* @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
public void add(String key, String value) {
......@@ -129,16 +127,16 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* An element pops out of the queue <p>
* note:
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
* The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low.
* 流程实例优先级_流程实例id_任务优先级_任务id high <- low
* 流程优先级_流程实例id_任务优先级_任务id_任务执行的机器id1,任务执行的机器id2,... high <- low
* @param key task queue name
* @param remove whether remove the element
* @return the task id to be executed
* @param tasksNum how many elements to poll
* @return the task ids to be executed
public String poll(String key, boolean remove) {
public List<String> poll(String key, int tasksNum) {
CuratorFramework zk = getZkClient();
String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
......@@ -146,55 +144,79 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
if(list != null && list.size() > 0){
String workerIp = OSUtils.getHost();
String workerIpLongStr = String.valueOf(IpUtils.ipToLong(workerIp));
int size = list.size();
String formatTargetTask = null;
String targetTaskKey = null;
Set<String> taskTreeSet = new TreeSet<>();
for (int i = 0; i < size; i++) {
String taskDetail = list.get(i);
String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE);
if(taskDetailArrs.length == 4){
if(taskDetailArrs.length >= 4){
//format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
String formatTask = String.format("%s_%010d_%s_%010d", taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], Long.parseLong(taskDetailArrs[3]));
if(i > 0){
int result = formatTask.compareTo(formatTargetTask);
if(result < 0){
formatTargetTask = formatTask;
targetTaskKey = taskDetail;
if(taskDetailArrs.length > 4){
String taskHosts = taskDetailArrs[4];
//task can assign to any worker host if equals default ip value of worker server
String[] taskHostsArr = taskHosts.split(Constants.COMMA);
formatTargetTask = formatTask;
targetTaskKey = taskDetail;
logger.error("task queue poll error, task detail :{} , please check!", taskDetail);
if(formatTargetTask != null){
String taskIdPath = tasksQueuePath + targetTaskKey;
logger.info("consume task {}", taskIdPath);
List<String> taskslist = getTasksListFromTreeSet(tasksNum, taskTreeSet);
String[] vals = targetTaskKey.split(Constants.UNDERLINE);
logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(taskslist.toArray()), size - taskslist.size());
removeNode(key, targetTaskKey);
logger.info("consume task: {},there still have {} tasks need to be executed", vals[vals.length - 1], size - 1);
return targetTaskKey;
logger.error("should not go here, task queue poll error, please check!");
return taskslist;
} catch (Exception e) {
logger.error("add task to tasks queue exception",e);
return null;
return new ArrayList<String>();
* get task list from tree set
* @param tasksNum
* @param taskTreeSet
public List<String> getTasksListFromTreeSet(int tasksNum, Set<String> taskTreeSet) {
Iterator<String> iterator = taskTreeSet.iterator();
int j = 0;
List<String> taskslist = new ArrayList<>(tasksNum);
if(j++ < tasksNum){
String task = iterator.next();
return taskslist;
public void removeNode(String key, String nodeValue){
......@@ -312,7 +312,11 @@ public abstract class AbstractZKClient {
childrenList = zkClient.getChildren().forPath(masterZNodeParentPath);
} catch (Exception e) {
// logger.warn(e.getMessage());
if(!e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){
return childrenList.size();
return childrenList.size();
......@@ -37,6 +37,12 @@ public class OSUtilsTest {
// static HardwareAbstractionLayer hal = si.getHardware();
public void getHost(){
public void memoryUsage() {
logger.info("memoryUsage : {}", OSUtils.memoryUsage());// 0.3361799418926239
......@@ -49,9 +49,11 @@ public class TaskQueueImplTest {
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0);
String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0);
......@@ -99,7 +101,7 @@ public class TaskQueueImplTest {
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0);
//clear all data
......@@ -24,6 +24,7 @@ import cn.escheduler.common.queue.ITaskQueue;
import cn.escheduler.common.queue.TaskQueueFactory;
import cn.escheduler.common.task.subprocess.SubProcessParameters;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.IpUtils;
import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.dao.mapper.*;
......@@ -108,7 +109,7 @@ public class ProcessDao extends AbstractBaseDao {
protected void init() {
userMapper = getMapper(UserMapper.class);
processDefineMapper = getMapper(ProcessDefinitionMapper.class);
processInstanceMapper = getMapper(ProcessInstanceMapper.class);
dataSourceMapper = getMapper(DataSourceMapper.class);
......@@ -947,11 +948,58 @@ public class ProcessDao extends AbstractBaseDao {
* 流程实例优先级_流程实例id_任务优先级_任务id high <- low
* @param task
* @param taskInstance
* @return
private String taskZkInfo(TaskInstance task) {
return String.valueOf(task.getProcessInstancePriority().ordinal()) + Constants.UNDERLINE + task.getProcessInstanceId() + Constants.UNDERLINE + task.getTaskInstancePriority().ordinal() + Constants.UNDERLINE + task.getId();
private String taskZkInfo(TaskInstance taskInstance) {
int taskWorkerGroupId = getTaskWorkerGroupId(taskInstance);
StringBuilder sb = new StringBuilder(100);
if(taskWorkerGroupId > 0){
//not to find data from db
WorkerGroup workerGroup = queryWorkerGroupById(taskWorkerGroupId);
if(workerGroup == null ){
logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId());
return sb.toString();
String ips = workerGroup.getIpList();
logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers",
taskInstance.getId(), workerGroup.getId());
return sb.toString();
StringBuilder ipSb = new StringBuilder(100);
String[] ipArray = ips.split(COMMA);
for (String ip : ipArray) {
long ipLong = IpUtils.ipToLong(ip);
if(ipSb.length() > 0) {
ipSb.deleteCharAt(ipSb.length() - 1);
return sb.toString();
......@@ -1591,5 +1639,23 @@ public class ProcessDao extends AbstractBaseDao {
* get task worker group id
* @param taskInstance
* @return
public int getTaskWorkerGroupId(TaskInstance taskInstance) {
int taskWorkerGroupId = taskInstance.getWorkerGroupId();
ProcessInstance processInstance = findProcessInstanceByTaskId(taskInstance.getId());
if(processInstance == null){
logger.error("cannot find the task:{} process instance", taskInstance.getId());
int processWorkerGroupId = processInstance.getWorkerGroupId();
taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId);
return taskWorkerGroupId;
......@@ -28,8 +28,9 @@ import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.dao.model.WorkerGroup;
import cn.escheduler.server.zk.ZKWorkerClient;
import com.cronutils.utils.StringUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -101,15 +102,7 @@ public class FetchTaskThread implements Runnable{
private boolean checkWorkerGroup(TaskInstance taskInstance, String host){
int taskWorkerGroupId = taskInstance.getWorkerGroupId();
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId());
if(processInstance == null){
logger.error("cannot find the task:{} process instance", taskInstance.getId());
return false;
int processWorkerGroupId = processInstance.getWorkerGroupId();
taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId);
int taskWorkerGroupId = processDao.getTaskWorkerGroupId(taskInstance);
if(taskWorkerGroupId <= 0){
return true;
......@@ -120,118 +113,124 @@ public class FetchTaskThread implements Runnable{
return true;
String ips = workerGroup.getIpList();
if(ips == null){
logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers",
taskInstance.getId(), workerGroup.getId());
String[] ipArray = ips.split(",");
String[] ipArray = ips.split(Constants.COMMA);
List<String> ipList = Arrays.asList(ipArray);
return ipList.contains(host);
public void run() {
while (Stopper.isRunning()){
InterProcessMutex mutex = null;
try {
if(OSUtils.checkResource(this.conf, false)) {
// creating distributed locks, lock path /escheduler/lock/worker
String zNodeLockPath = zkWorkerClient.getWorkerLockPath();
mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath);
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
//check memory and cpu usage and threads
if(OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor)) {
for (int i = 0; i < taskNum; i++) {
int activeCount = poolExecutor.getActiveCount();
if (activeCount >= workerExecNums) {
logger.info("thread insufficient , activeCount : {} , workerExecNums : {}",activeCount,workerExecNums);
//whether have tasks, if no tasks , no need lock //get all tasks
List<String> tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE);
if(tasksQueueList.size() > 0){
// creating distributed locks, lock path /escheduler/lock/worker
String zNodeLockPath = zkWorkerClient.getWorkerLockPath();
mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath);
// task instance id str
String taskQueueStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
List<String> taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum);
if (!StringUtils.isEmpty(taskQueueStr )) {
for(String taskQueueStr : taskQueueStrArr){
if (StringUtils.isNotBlank(taskQueueStr )) {
String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
String taskInstIdStr = taskStringArray[taskStringArray.length - 1];
Date now = new Date();
Integer taskId = Integer.parseInt(taskInstIdStr);
if (!checkThreadCount(poolExecutor)) {
// find task instance by task id
TaskInstance taskInstance = processDao.findTaskInstanceById(taskId);
String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
String taskInstIdStr = taskStringArray[taskStringArray.length - 1];
Date now = new Date();
Integer taskId = Integer.parseInt(taskInstIdStr);
logger.info("worker fetch taskId : {} from queue ", taskId);
// find task instance by task id
TaskInstance taskInstance = processDao.findTaskInstanceById(taskId);
int retryTimes = 30;
// mainly to wait for the master insert task to succeed
while (taskInstance == null && retryTimes > 0) {
taskInstance = processDao.findTaskInstanceById(taskId);
logger.info("worker fetch taskId : {} from queue ", taskId);
if (taskInstance == null ) {
logger.error("task instance is null. task id : {} ", taskId);
if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
logger.info("remove task:{} from queue", taskQueueStr);
int retryTimes = 30;
// mainly to wait for the master insert task to succeed
while (taskInstance == null && retryTimes > 0) {
taskInstance = processDao.findTaskInstanceById(taskId);
if (taskInstance == null ) {
logger.error("task instance is null. task id : {} ", taskId);
// set execute task worker host
if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
logger.info("remove task:{} from queue", taskQueueStr);
// set execute task worker host
// get process instance
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// get process define
ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
// get process instance
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// get process define
ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
// get local execute path
String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(),
logger.info("task instance local execute path : {} ", execLocalPath);
// get local execute path
String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(),
logger.info("task instance local execute path : {} ", execLocalPath);
// set task execute path
// check and create Linux users
processInstance.getTenantCode(), logger);
// set task execute path
logger.info("task : {} ready to submit to task scheduler thread",taskId);
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
// check and create Linux users
processInstance.getTenantCode(), logger);
logger.info("task : {} ready to submit to task scheduler thread",taskId);
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
}catch (Exception e){
logger.error("fetch task thread exception : " + e.getMessage(),e);
finally {
}finally {
if (mutex != null){
try {
......@@ -246,4 +245,18 @@ public class FetchTaskThread implements Runnable{
* @param poolExecutor
* @return
private boolean checkThreadCount(ThreadPoolExecutor poolExecutor) {
int activeCount = poolExecutor.getActiveCount();
if (activeCount >= workerExecNums) {
logger.info("thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource", activeCount, workerExecNums, Constants.SLEEP_TIME_MILLIS);
return false;
return true;
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册