TaskExecuteThread.java 9.3 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Q
qiaozhanwei 已提交
17
package org.apache.dolphinscheduler.server.worker.runner;
L
ligang 已提交
18

S
simon824 已提交
19
import com.fasterxml.jackson.core.type.TypeReference;
S
simon 已提交
20
import org.apache.dolphinscheduler.common.utils.*;
L
ligang 已提交
21

Q
qiaozhanwei 已提交
22
import org.apache.dolphinscheduler.common.Constants;
Q
qiaozhanwei 已提交
23 24 25 26
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
Q
qiaozhanwei 已提交
27
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
28
import org.apache.dolphinscheduler.common.utils.*;
T
Tboy 已提交
29
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
Q
qiaozhanwei 已提交
30
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
Q
qiaozhanwei 已提交
31
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
Q
qiaozhanwei 已提交
32 33
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
L
ligang 已提交
34 35 36 37 38 39 40 41 42 43 44
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.*;
import java.util.stream.Collectors;


/**
 *  task scheduler thread
 */
T
Tboy 已提交
45
public class TaskExecuteThread implements Runnable {
L
ligang 已提交
46 47 48 49

    /**
     * logger
     */
T
Tboy 已提交
50
    private final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
L
ligang 已提交
51 52 53 54

    /**
     *  task instance
     */
T
Tboy 已提交
55
    private TaskExecutionContext taskExecutionContext;
L
ligang 已提交
56 57

    /**
journey2018's avatar
journey2018 已提交
58
     *  abstract task
L
ligang 已提交
59 60 61
     */
    private AbstractTask task;

T
Tboy 已提交
62
    /**
T
Tboy 已提交
63
     *  task callback service
T
Tboy 已提交
64
     */
T
Tboy 已提交
65
    private TaskCallbackService taskCallbackService;
T
Tboy 已提交
66

67
    /**
68
     *  constructor
T
Tboy 已提交
69
     * @param taskExecutionContext taskExecutionContext
T
Tboy 已提交
70
     * @param taskCallbackService taskCallbackService
71
     */
72
    public TaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService){
T
Tboy 已提交
73
        this.taskExecutionContext = taskExecutionContext;
T
Tboy 已提交
74
        this.taskCallbackService = taskCallbackService;
L
ligang 已提交
75 76 77
    }

    @Override
journey2018's avatar
journey2018 已提交
78
    public void run() {
L
ligang 已提交
79

T
Tboy 已提交
80
        TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());
L
ligang 已提交
81
        try {
T
Tboy 已提交
82
            logger.info("script path : {}", taskExecutionContext.getExecutePath());
journey2018's avatar
journey2018 已提交
83
            // task node
张世鸣 已提交
84
            TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
L
ligang 已提交
85

journey2018's avatar
journey2018 已提交
86
            // copy hdfs/minio file to local
87
            downloadResource(taskExecutionContext.getExecutePath(),
88
                    taskExecutionContext.getResources(),
89
                    taskExecutionContext.getTenantCode(),
L
ligang 已提交
90 91
                    logger);

92 93 94 95
            taskExecutionContext.setTaskParams(taskNode.getParams());
            taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
            taskExecutionContext.setDefinedParams(getGlobalParamsMap());

96
            // set task timeout
97
            setTaskTimeout(taskExecutionContext, taskNode);
98

99
            taskExecutionContext.setTaskAppId(String.format("%s_%s_%s",
T
Tboy 已提交
100 101
                    taskExecutionContext.getProcessDefineId(),
                    taskExecutionContext.getProcessInstanceId(),
T
Tboy 已提交
102
                    taskExecutionContext.getTaskInstanceId()));
103 104 105

            // custom logger
            Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
T
Tboy 已提交
106 107
                    taskExecutionContext.getProcessDefineId(),
                    taskExecutionContext.getProcessInstanceId(),
T
Tboy 已提交
108
                    taskExecutionContext.getTaskInstanceId()));
109

110 111 112


            task = TaskManager.newTask(taskExecutionContext,
113 114 115 116 117 118 119 120 121 122 123
                    taskLogger);

            // task init
            task.init();

            // task handle
            task.handle();

            // task result process
            task.after();

T
Tboy 已提交
124 125
            responseCommand.setStatus(task.getExitStatus().getCode());
            responseCommand.setEndTime(new Date());
126 127
            responseCommand.setProcessId(task.getProcessId());
            responseCommand.setAppIds(task.getAppIds());
T
Tboy 已提交
128
            logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
L
ligang 已提交
129
        }catch (Exception e){
journey2018's avatar
journey2018 已提交
130
            logger.error("task scheduler failure", e);
L
ligang 已提交
131
            kill();
T
Tboy 已提交
132 133
            responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
            responseCommand.setEndTime(new Date());
134 135
            responseCommand.setProcessId(task.getProcessId());
            responseCommand.setAppIds(task.getAppIds());
T
Tboy 已提交
136
        } finally {
Q
qiaozhanwei 已提交
137 138 139 140 141 142
            try {
                taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
            }catch (Exception e){
                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
                taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
            }
L
ligang 已提交
143 144 145 146
        }
    }

    /**
journey2018's avatar
journey2018 已提交
147 148 149 150 151 152 153
     * get global paras map
     * @return
     */
    private Map<String, String> getGlobalParamsMap() {
        Map<String,String> globalParamsMap = new HashMap<>(16);

        // global params string
T
Tboy 已提交
154
        String globalParamsStr = taskExecutionContext.getGlobalParams();
journey2018's avatar
journey2018 已提交
155
        if (globalParamsStr != null) {
S
simon824 已提交
156
            List<Property> globalParamsList = new ArrayList<>();
S
simon824 已提交
157
            globalParamsList = JSONUtils.toList(globalParamsStr, Property.class);
journey2018's avatar
journey2018 已提交
158 159 160 161
            globalParamsMap.putAll(globalParamsList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)));
        }
        return globalParamsMap;
    }
162

journey2018's avatar
journey2018 已提交
163 164
    /**
     * set task timeout
165
     * @param taskExecutionContext TaskExecutionContext
L
ligang 已提交
166 167
     * @param taskNode
     */
168
    private void setTaskTimeout(TaskExecutionContext taskExecutionContext, TaskNode taskNode) {
journey2018's avatar
journey2018 已提交
169
        // the default timeout is the maximum value of the integer
170
        taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE);
L
ligang 已提交
171 172
        TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
        if (taskTimeoutParameter.getEnable()){
journey2018's avatar
journey2018 已提交
173
            // get timeout strategy
174
            taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode());
L
ligang 已提交
175 176 177 178 179
            switch (taskTimeoutParameter.getStrategy()){
                case WARN:
                    break;
                case FAILED:
                    if (Integer.MAX_VALUE > taskTimeoutParameter.getInterval() * 60) {
180
                        taskExecutionContext.setTaskTimeout(taskTimeoutParameter.getInterval() * 60);
L
ligang 已提交
181 182 183 184
                    }
                    break;
                case WARNFAILED:
                    if (Integer.MAX_VALUE > taskTimeoutParameter.getInterval() * 60) {
185
                        taskExecutionContext.setTaskTimeout(taskTimeoutParameter.getInterval() * 60);
L
ligang 已提交
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
                    }
                    break;
                default:
                    logger.error("not support task timeout strategy: {}", taskTimeoutParameter.getStrategy());
                    throw new IllegalArgumentException("not support task timeout strategy");

            }
        }
    }


    /**
     *  kill task
     */
    public void kill(){
        if (task != null){
            try {
                task.cancelApplication(true);
            }catch (Exception e){
                logger.error(e.getMessage(),e);
            }
        }
    }


    /**
212
     * download resource file
L
ligang 已提交
213 214 215 216 217
     *
     * @param execLocalPath
     * @param projectRes
     * @param logger
     */
218 219 220 221
    private void downloadResource(String execLocalPath,
                                  List<String> projectRes,
                                  String tenantCode,
                                  Logger logger) throws Exception {
222 223 224 225
        if (CollectionUtils.isEmpty(projectRes)){
            return;
        }

226 227
        for (String resource : projectRes) {
            File resFile = new File(execLocalPath, resource);
L
ligang 已提交
228 229
            if (!resFile.exists()) {
                try {
journey2018's avatar
journey2018 已提交
230
                    // query the tenant code of the resource according to the name of the resource
Q
qiaozhanwei 已提交
231
                    String resHdfsPath = HadoopUtils.getHdfsResourceFileName(tenantCode, resource);
L
ligang 已提交
232 233

                    logger.info("get resource file from hdfs :{}", resHdfsPath);
234
                    HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + resource, false, true);
L
ligang 已提交
235 236 237 238 239 240 241 242 243 244
                }catch (Exception e){
                    logger.error(e.getMessage(),e);
                    throw new RuntimeException(e.getMessage());
                }
            } else {
                logger.info("file : {} exists ", resFile.getName());
            }
        }
    }
}