FetchTaskThread.java 10.2 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.escheduler.server.worker.runner;

import cn.escheduler.common.Constants;
import cn.escheduler.common.queue.ITaskQueue;
import cn.escheduler.common.thread.Stopper;
import cn.escheduler.common.thread.ThreadUtils;
import cn.escheduler.common.utils.FileUtils;
import cn.escheduler.common.utils.OSUtils;
25
import cn.escheduler.common.zk.AbstractZKClient;
L
ligang 已提交
26
import cn.escheduler.dao.ProcessDao;
27
import cn.escheduler.dao.model.*;
L
ligang 已提交
28 29
import cn.escheduler.server.zk.ZKWorkerClient;
import org.apache.commons.configuration.Configuration;
30 31
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
L
ligang 已提交
32 33 34 35
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

B
baoliang 已提交
36
import java.util.Arrays;
L
ligang 已提交
37
import java.util.Date;
B
baoliang 已提交
38
import java.util.List;
L
ligang 已提交
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

/**
 *  fetch task thread
 */
public class FetchTaskThread implements Runnable{

    private static final Logger logger = LoggerFactory.getLogger(FetchTaskThread.class);
    /**
     *  set worker concurrent tasks
     */
    private final int taskNum;

    /**
     *  zkWorkerClient
     */
    private final ZKWorkerClient zkWorkerClient;

    /**
     * task queue impl
     */
    protected ITaskQueue taskQueue;

    /**
     *  process database access
     */
    private final ProcessDao processDao;

    /**
     *  worker thread pool executor
     */
    private final ExecutorService workerExecService;

    /**
     *  worker exec nums
     */
    private int workerExecNums;

    private Configuration conf;


    public FetchTaskThread(int taskNum, ZKWorkerClient zkWorkerClient,
                           ProcessDao processDao, Configuration conf,
                           ITaskQueue taskQueue){
        this.taskNum = taskNum;
        this.zkWorkerClient = zkWorkerClient;
        this.processDao = processDao;
        this.workerExecNums = conf.getInt(Constants.WORKER_EXEC_THREADS,
                Constants.defaultWorkerExecThreadNum);
        // worker thread pool executor
        this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Fetch-Task-Thread",workerExecNums);
        this.conf = conf;
        this.taskQueue = taskQueue;
    }

B
baoliang 已提交
95 96 97 98 99 100 101 102
    /**
     * Check if the task runs on this worker
     * @param taskInstance
     * @param host
     * @return
     */
    private boolean checkWorkerGroup(TaskInstance taskInstance, String host){

103
        int taskWorkerGroupId = processDao.getTaskWorkerGroupId(taskInstance);
B
baoliang 已提交
104

B
baoliang 已提交
105
        if(taskWorkerGroupId <= 0){
B
baoliang 已提交
106 107
            return true;
        }
B
baoliang 已提交
108
        WorkerGroup workerGroup = processDao.queryWorkerGroupById(taskWorkerGroupId);
B
baoliang 已提交
109 110 111 112 113
        if(workerGroup == null ){
            logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId());
            return true;
        }
        String ips = workerGroup.getIpList();
114
        if(StringUtils.isBlank(ips)){
B
baoliang 已提交
115 116 117
            logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers",
                    taskInstance.getId(), workerGroup.getId());
        }
118
        String[] ipArray = ips.split(Constants.COMMA);
B
baoliang 已提交
119 120 121 122
        List<String> ipList =  Arrays.asList(ipArray);
        return ipList.contains(host);
    }

L
ligang 已提交
123

124 125


L
ligang 已提交
126 127 128 129 130 131
    @Override
    public void run() {

        while (Stopper.isRunning()){
            InterProcessMutex mutex = null;
            try {
132
                ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
L
ligang 已提交
133

134 135
                //check memory and cpu usage and threads
                if(OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor)) {
L
ligang 已提交
136

137 138 139 140 141 142 143
                    //whether have tasks, if no tasks , no need lock  //get all tasks
                    List<String> tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE);
                    if(tasksQueueList.size() > 0){
                        // creating distributed locks, lock path /escheduler/lock/worker
                        String zNodeLockPath = zkWorkerClient.getWorkerLockPath();
                        mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath);
                        mutex.acquire();
L
ligang 已提交
144 145

                        // task instance id str
146
                        List<String> taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum);
L
ligang 已提交
147

148 149
                        for(String taskQueueStr : taskQueueStrArr){
                            if (StringUtils.isNotBlank(taskQueueStr )) {
L
ligang 已提交
150

151 152 153
                                if (!checkThreadCount(poolExecutor)) {
                                    break;
                                }
L
ligang 已提交
154

155
                                String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
156
                                String taskInstIdStr = taskStringArray[3];
157 158
                                Date now = new Date();
                                Integer taskId = Integer.parseInt(taskInstIdStr);
L
ligang 已提交
159

160 161
                                // find task instance by task id
                                TaskInstance taskInstance = processDao.findTaskInstanceById(taskId);
L
ligang 已提交
162

163
                                logger.info("worker fetch taskId : {} from queue ", taskId);
L
ligang 已提交
164

165 166 167 168 169 170 171 172 173 174 175 176
                                int retryTimes = 30;
                                // mainly to wait for the master insert task to succeed
                                while (taskInstance == null && retryTimes > 0) {
                                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                                    taskInstance = processDao.findTaskInstanceById(taskId);
                                    retryTimes--;
                                }

                                if (taskInstance == null ) {
                                    logger.error("task instance is null. task id : {} ", taskId);
                                    continue;
                                }
L
ligang 已提交
177

178 179 180 181 182
                                if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
                                    continue;
                                }
                                taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
                                logger.info("remove task:{} from queue", taskQueueStr);
L
ligang 已提交
183

184 185 186
                                // set execute task worker host
                                taskInstance.setHost(OSUtils.getHost());
                                taskInstance.setStartTime(now);
L
ligang 已提交
187 188


189 190
                                // get process instance
                                ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
191

192 193
                                // get process define
                                ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
L
ligang 已提交
194 195


196 197
                                taskInstance.setProcessInstance(processInstance);
                                taskInstance.setProcessDefine(processDefine);
L
ligang 已提交
198 199


200 201 202 203 204 205
                                // get local execute path
                                String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(),
                                        processDefine.getId(),
                                        processInstance.getId(),
                                        taskInstance.getId());
                                logger.info("task instance  local execute path : {} ", execLocalPath);
L
ligang 已提交
206 207


208 209
                                // set task execute path
                                taskInstance.setExecutePath(execLocalPath);
L
ligang 已提交
210

journey2018's avatar
update  
journey2018 已提交
211 212
                            Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
                                    processDefine.getUserId());
leon-baoliang's avatar
leon-baoliang 已提交
213 214 215 216 217
                            if(tenant == null){
                                logger.error("cannot find suitable tenant for the task:{}, process instance tenant:{}, process definition tenant:{}",
                                        taskInstance.getName(),processInstance.getTenantId(), processDefine.getTenantId());
                                continue;
                            }
journey2018's avatar
update  
journey2018 已提交
218

L
ligang 已提交
219 220
                            // check and create Linux users
                            FileUtils.createWorkDirAndUserIfAbsent(execLocalPath,
journey2018's avatar
update  
journey2018 已提交
221
                                    tenant.getTenantCode(), logger);
L
ligang 已提交
222

223 224 225
                                logger.info("task : {} ready to submit to task scheduler thread",taskId);
                                // submit task
                                workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
L
lidongdai 已提交
226

227
                            }
L
ligang 已提交
228
                        }
L
lidongdai 已提交
229

L
ligang 已提交
230
                    }
231

L
ligang 已提交
232 233 234 235 236 237
                }

                Thread.sleep(Constants.SLEEP_TIME_MILLIS);

            }catch (Exception e){
                logger.error("fetch task thread exception : " + e.getMessage(),e);
238
            }finally {
239
                AbstractZKClient.releaseMutex(mutex);
L
ligang 已提交
240 241 242
            }
        }
    }
243 244 245 246 247 248 249 250 251 252 253 254 255 256

    /**
     *
     * @param poolExecutor
     * @return
     */
    private boolean checkThreadCount(ThreadPoolExecutor poolExecutor) {
        int activeCount = poolExecutor.getActiveCount();
        if (activeCount >= workerExecNums) {
            logger.info("thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource", activeCount, workerExecNums, Constants.SLEEP_TIME_MILLIS);
            return false;
        }
        return true;
    }
L
ligang 已提交
257
}