TaskExecuteThread.java 9.2 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Q
qiaozhanwei 已提交
17
package org.apache.dolphinscheduler.server.worker.runner;
L
ligang 已提交
18

19 20 21 22 23 24
import java.io.File;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
L
ligang 已提交
25

Q
qiaozhanwei 已提交
26
import org.apache.dolphinscheduler.common.Constants;
Q
qiaozhanwei 已提交
27 28 29 30
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
Q
qiaozhanwei 已提交
31
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
32 33 34 35
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
T
Tboy 已提交
36
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
Q
qiaozhanwei 已提交
37
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
Q
qiaozhanwei 已提交
38
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
Q
qiaozhanwei 已提交
39 40
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
L
ligang 已提交
41 42 43 44 45 46 47
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 *  task scheduler thread
 */
T
Tboy 已提交
48
public class TaskExecuteThread implements Runnable {
L
ligang 已提交
49 50 51 52

    /**
     * logger
     */
T
Tboy 已提交
53
    private final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
L
ligang 已提交
54 55 56 57

    /**
     *  task instance
     */
T
Tboy 已提交
58
    private TaskExecutionContext taskExecutionContext;
L
ligang 已提交
59 60

    /**
journey2018's avatar
journey2018 已提交
61
     *  abstract task
L
ligang 已提交
62 63 64
     */
    private AbstractTask task;

T
Tboy 已提交
65
    /**
T
Tboy 已提交
66
     *  task callback service
T
Tboy 已提交
67
     */
T
Tboy 已提交
68
    private TaskCallbackService taskCallbackService;
T
Tboy 已提交
69

70 71 72 73 74
    /**
     * task logger
     */
    private Logger taskLogger;

75
    /**
76
     *  constructor
T
Tboy 已提交
77
     * @param taskExecutionContext taskExecutionContext
T
Tboy 已提交
78
     * @param taskCallbackService taskCallbackService
79
     */
80 81 82
    public TaskExecuteThread(TaskExecutionContext taskExecutionContext
            , TaskCallbackService taskCallbackService
            , Logger taskLogger) {
T
Tboy 已提交
83
        this.taskExecutionContext = taskExecutionContext;
T
Tboy 已提交
84
        this.taskCallbackService = taskCallbackService;
85
        this.taskLogger = taskLogger;
L
ligang 已提交
86 87 88
    }

    @Override
journey2018's avatar
journey2018 已提交
89
    public void run() {
L
ligang 已提交
90

T
Tboy 已提交
91
        TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());
L
ligang 已提交
92
        try {
T
Tboy 已提交
93
            logger.info("script path : {}", taskExecutionContext.getExecutePath());
journey2018's avatar
journey2018 已提交
94
            // task node
张世鸣 已提交
95
            TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
L
ligang 已提交
96

journey2018's avatar
journey2018 已提交
97
            // copy hdfs/minio file to local
98
            downloadResource(taskExecutionContext.getExecutePath(),
99
                    taskExecutionContext.getResources(),
100
                    taskExecutionContext.getTenantCode(),
L
ligang 已提交
101 102
                    logger);

103 104 105 106
            taskExecutionContext.setTaskParams(taskNode.getParams());
            taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
            taskExecutionContext.setDefinedParams(getGlobalParamsMap());

107
            // set task timeout
108
            setTaskTimeout(taskExecutionContext, taskNode);
109

110
            taskExecutionContext.setTaskAppId(String.format("%s_%s_%s",
T
Tboy 已提交
111 112
                    taskExecutionContext.getProcessDefineId(),
                    taskExecutionContext.getProcessInstanceId(),
T
Tboy 已提交
113
                    taskExecutionContext.getTaskInstanceId()));
114

115
            task = TaskManager.newTask(taskExecutionContext, taskLogger);
116 117 118 119 120 121 122 123 124 125

            // task init
            task.init();

            // task handle
            task.handle();

            // task result process
            task.after();

T
Tboy 已提交
126 127
            responseCommand.setStatus(task.getExitStatus().getCode());
            responseCommand.setEndTime(new Date());
128 129
            responseCommand.setProcessId(task.getProcessId());
            responseCommand.setAppIds(task.getAppIds());
T
Tboy 已提交
130
            logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
L
ligang 已提交
131
        }catch (Exception e){
journey2018's avatar
journey2018 已提交
132
            logger.error("task scheduler failure", e);
L
ligang 已提交
133
            kill();
T
Tboy 已提交
134 135
            responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
            responseCommand.setEndTime(new Date());
136 137
            responseCommand.setProcessId(task.getProcessId());
            responseCommand.setAppIds(task.getAppIds());
T
Tboy 已提交
138
        } finally {
Q
qiaozhanwei 已提交
139 140 141 142 143 144
            try {
                taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
            }catch (Exception e){
                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
                taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
            }
L
ligang 已提交
145 146 147 148
        }
    }

    /**
journey2018's avatar
journey2018 已提交
149 150 151 152 153 154 155
     * get global paras map
     * @return
     */
    private Map<String, String> getGlobalParamsMap() {
        Map<String,String> globalParamsMap = new HashMap<>(16);

        // global params string
T
Tboy 已提交
156
        String globalParamsStr = taskExecutionContext.getGlobalParams();
journey2018's avatar
journey2018 已提交
157
        if (globalParamsStr != null) {
158
            List<Property> globalParamsList = JSONUtils.toList(globalParamsStr, Property.class);
journey2018's avatar
journey2018 已提交
159 160 161 162
            globalParamsMap.putAll(globalParamsList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)));
        }
        return globalParamsMap;
    }
163

journey2018's avatar
journey2018 已提交
164 165
    /**
     * set task timeout
166
     * @param taskExecutionContext TaskExecutionContext
L
ligang 已提交
167 168
     * @param taskNode
     */
169
    private void setTaskTimeout(TaskExecutionContext taskExecutionContext, TaskNode taskNode) {
journey2018's avatar
journey2018 已提交
170
        // the default timeout is the maximum value of the integer
171
        taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE);
L
ligang 已提交
172 173
        TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
        if (taskTimeoutParameter.getEnable()){
journey2018's avatar
journey2018 已提交
174
            // get timeout strategy
175
            taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode());
L
ligang 已提交
176 177 178 179 180
            switch (taskTimeoutParameter.getStrategy()){
                case WARN:
                    break;
                case FAILED:
                    if (Integer.MAX_VALUE > taskTimeoutParameter.getInterval() * 60) {
181
                        taskExecutionContext.setTaskTimeout(taskTimeoutParameter.getInterval() * 60);
L
ligang 已提交
182 183 184 185
                    }
                    break;
                case WARNFAILED:
                    if (Integer.MAX_VALUE > taskTimeoutParameter.getInterval() * 60) {
186
                        taskExecutionContext.setTaskTimeout(taskTimeoutParameter.getInterval() * 60);
L
ligang 已提交
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
                    }
                    break;
                default:
                    logger.error("not support task timeout strategy: {}", taskTimeoutParameter.getStrategy());
                    throw new IllegalArgumentException("not support task timeout strategy");

            }
        }
    }


    /**
     *  kill task
     */
    public void kill(){
        if (task != null){
            try {
                task.cancelApplication(true);
            }catch (Exception e){
                logger.error(e.getMessage(),e);
            }
        }
    }


    /**
213
     * download resource file
L
ligang 已提交
214 215 216 217 218
     *
     * @param execLocalPath
     * @param projectRes
     * @param logger
     */
219 220 221 222
    private void downloadResource(String execLocalPath,
                                  List<String> projectRes,
                                  String tenantCode,
                                  Logger logger) throws Exception {
223 224 225 226
        if (CollectionUtils.isEmpty(projectRes)){
            return;
        }

227 228
        for (String resource : projectRes) {
            File resFile = new File(execLocalPath, resource);
L
ligang 已提交
229 230
            if (!resFile.exists()) {
                try {
journey2018's avatar
journey2018 已提交
231
                    // query the tenant code of the resource according to the name of the resource
Q
qiaozhanwei 已提交
232
                    String resHdfsPath = HadoopUtils.getHdfsResourceFileName(tenantCode, resource);
L
ligang 已提交
233 234

                    logger.info("get resource file from hdfs :{}", resHdfsPath);
235
                    HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + resource, false, true);
L
ligang 已提交
236 237 238 239 240 241 242 243 244 245
                }catch (Exception e){
                    logger.error(e.getMessage(),e);
                    throw new RuntimeException(e.getMessage());
                }
            } else {
                logger.info("file : {} exists ", resFile.getName());
            }
        }
    }
}