ConditionsTaskExecThread.java 6.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
17
package org.apache.dolphinscheduler.server.master.runner;
18 19 20 21 22 23 24 25

import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
张世鸣 已提交
26
import org.apache.dolphinscheduler.common.utils.*;
27
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
28
import org.apache.dolphinscheduler.common.utils.NetUtils;
29
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
30 31
import org.apache.dolphinscheduler.server.utils.LogUtils;

32
import org.slf4j.LoggerFactory;
33 34

import java.util.ArrayList;
35
import java.util.Date;
36 37 38 39
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

40
public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
41 42 43 44 45 46 47 48


    /**
     * dependent parameters
     */
    private DependentParameters dependentParameters;

    /**
49
     * complete task map
50
     */
Q
test  
qiaozhanwei 已提交
51 52
    private Map<String, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();

53
    /**
54
     * condition result
55
     */
56
    private DependResult conditionResult;
57 58

    /**
59
     * constructor of MasterBaseTaskExecThread
60
     *
61
     * @param taskInstance    task instance
62
     */
63 64
    public ConditionsTaskExecThread(TaskInstance taskInstance) {
        super(taskInstance);
65
        taskInstance.setStartTime(new Date());
66 67 68
    }

    @Override
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
    public Boolean submitWaitComplete() {
        try{
            this.taskInstance = submit();
            logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
                    taskInstance.getProcessDefinitionId(),
                    taskInstance.getProcessInstanceId(),
                    taskInstance.getId()));
            String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
            Thread.currentThread().setName(threadLoggerInfoName);
            initTaskParameters();
            logger.info("dependent task start");
            waitTaskQuit();
            updateTaskState();
        }catch (Exception e){
            logger.error("conditions task run exception" , e);
84
        }
85
        return true;
86 87
    }

88 89 90 91 92 93 94
    private void waitTaskQuit() {
        List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(
                taskInstance.getProcessInstanceId()
        );
        for(TaskInstance task : taskInstances){
            completeTaskList.putIfAbsent(task.getName(), task.getState());
        }
95 96 97 98 99 100 101 102 103 104 105

        List<DependResult> modelResultList = new ArrayList<>();
        for(DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()){

            List<DependResult> itemDependResult = new ArrayList<>();
            for(DependentItem item : dependentTaskModel.getDependItemList()){
                itemDependResult.add(getDependResultForItem(item));
            }
            DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult);
            modelResultList.add(modelResult);
        }
106
        conditionResult = DependentUtils.getDependResultForRelation(
107 108
                dependentParameters.getRelation(), modelResultList
        );
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
        logger.info("the conditions task depend result : {}", conditionResult);
    }

    /**
     *
     */
    private void updateTaskState() {
        ExecutionStatus status;
        if(this.cancel){
            status = ExecutionStatus.KILL;
        }else{
            status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
        }
        taskInstance.setState(status);
        taskInstance.setEndTime(new Date());
        processService.updateTaskInstance(taskInstance);
    }

    private void initTaskParameters() {
128
        this.taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance));
129
        this.taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort());
_和's avatar
_和 已提交
130
        taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
131 132 133 134
        taskInstance.setStartTime(new Date());
        this.processService.saveTaskInstance(taskInstance);

        this.dependentParameters = JSONUtils.parseObject(this.taskInstance.getDependency(), DependentParameters.class);
135 136
    }

137 138 139 140 141 142

    /**
     * depend result for depend item
     * @param item
     * @return
     */
143 144 145 146 147 148 149 150 151 152
    private DependResult getDependResultForItem(DependentItem item){

        DependResult dependResult = DependResult.SUCCESS;
        if(!completeTaskList.containsKey(item.getDepTasks())){
            logger.info("depend item: {} have not completed yet.", item.getDepTasks());
            dependResult = DependResult.FAILED;
            return dependResult;
        }
        ExecutionStatus executionStatus = completeTaskList.get(item.getDepTasks());
        if(executionStatus != item.getStatus()){
153
            logger.info("depend item : {} expect status: {}, actual status: {}" ,item.getDepTasks(), item.getStatus(), executionStatus);
154 155
            dependResult = DependResult.FAILED;
        }
156 157
        logger.info("dependent item complete {} {},{}",
                Constants.DEPENDENT_SPLIT, item.getDepTasks(), dependResult);
158 159 160
        return dependResult;
    }

161 162

}