TaskExecuteProcessor.java 8.7 KB
Newer Older
T
Tboy 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.dolphinscheduler.server.worker.processor;

20 21 22 23 24 25

import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;

T
Tboy 已提交
26 27 28
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
T
Tboy 已提交
29
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
30 31 32 33 34 35
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.RetryerUtils;
T
Tboy 已提交
36 37
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
T
Tboy 已提交
38 39
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
T
Tboy 已提交
40
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
S
simon824 已提交
41
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
Q
qiaozhanwei 已提交
42
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
S
simon 已提交
43
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
T
Tboy 已提交
44
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
T
Tboy 已提交
45
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
T
Tboy 已提交
46 47 48 49
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

50 51 52 53 54
import com.github.rholder.retry.RetryException;

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import io.netty.channel.Channel;
T
Tboy 已提交
55

Q
qiaozhanwei 已提交
56 57 58
/**
 *  worker request processor
 */
T
Tboy 已提交
59
public class TaskExecuteProcessor implements NettyRequestProcessor {
T
Tboy 已提交
60

T
Tboy 已提交
61
    private final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
T
Tboy 已提交
62

Q
qiaozhanwei 已提交
63 64 65
    /**
     *  thread executor service
     */
T
Tboy 已提交
66 67
    private final ExecutorService workerExecService;

Q
qiaozhanwei 已提交
68 69 70
    /**
     *  worker config
     */
T
Tboy 已提交
71 72
    private final WorkerConfig workerConfig;

Q
qiaozhanwei 已提交
73 74 75 76
    /**
     *  task callback service
     */
    private final TaskCallbackService taskCallbackService;
T
Tboy 已提交
77

78
    public TaskExecuteProcessor(){
T
Tboy 已提交
79
        this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
T
Tboy 已提交
80 81 82 83 84 85
        this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
        this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
    }

    @Override
    public void process(Channel channel, Command command) {
T
Tboy 已提交
86
        Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
Q
qiaozhanwei 已提交
87
                String.format("invalid command type : %s", command.getType()));
Q
qiaozhanwei 已提交
88

S
simon824 已提交
89
        TaskExecuteRequestCommand taskRequestCommand = JsonSerializer.deserialize(
T
Tboy 已提交
90
                command.getBody(), TaskExecuteRequestCommand.class);
Q
qiaozhanwei 已提交
91

Q
qiaozhanwei 已提交
92 93
        logger.info("received command : {}", taskRequestCommand);

94 95 96 97
        if(taskRequestCommand == null){
            logger.error("task execute request command is null");
            return;
        }
Q
qiaozhanwei 已提交
98

99
        String contextJson = taskRequestCommand.getTaskExecutionContext();
张世鸣 已提交
100
        TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
101 102 103 104 105
        if(taskExecutionContext == null){
            logger.error("task execution context is null");
            return;
        }

106
        taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort());
Q
qiaozhanwei 已提交
107

108 109 110 111 112 113
        // custom logger
        Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
                taskExecutionContext.getProcessDefineId(),
                taskExecutionContext.getProcessInstanceId(),
                taskExecutionContext.getTaskInstanceId()));

T
Tboy 已提交
114
        // local execute path
T
Tboy 已提交
115
        String execLocalPath = getExecLocalPath(taskExecutionContext);
T
Tboy 已提交
116
        logger.info("task instance  local execute path : {} ", execLocalPath);
117

118
        FileUtils.taskLoggerThreadLocal.set(taskLogger);
T
Tboy 已提交
119
        try {
T
Tboy 已提交
120
            FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode());
121 122 123 124
        } catch (Throwable ex) {
            String errorLog = String.format("create execLocalPath : %s", execLocalPath);
            LoggerUtils.logError(Optional.ofNullable(logger), errorLog, ex);
            LoggerUtils.logError(Optional.ofNullable(taskLogger), errorLog, ex);
T
Tboy 已提交
125
        }
126 127
        FileUtils.taskLoggerThreadLocal.remove();

T
Tboy 已提交
128 129
        taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
                new NettyRemoteChannel(channel, command.getOpaque()));
Q
qiaozhanwei 已提交
130

131 132
        // tell master that task is in executing
        final Command ackCommand = buildAckCommand(taskExecutionContext).convert2Command();
133

Q
qiaozhanwei 已提交
134
        try {
135 136 137 138 139
            RetryerUtils.retryCall(() -> {
                taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(),ackCommand);
                return Boolean.TRUE;
            });
            // submit task
140
            workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger));
141 142
        } catch (ExecutionException | RetryException e) {
            logger.error(e.getMessage(), e);
Q
qiaozhanwei 已提交
143
        }
T
Tboy 已提交
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
    }

    /**
     * get task log path
     * @return log path
     */
    private String getTaskLogPath(TaskExecutionContext taskExecutionContext) {
        String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory())
                .getLogger("ROOT")
                .getAppender("TASKLOGFILE"))
                .getDiscriminator()).getLogBase();
        if (baseLog.startsWith(Constants.SINGLE_SLASH)){
            return baseLog + Constants.SINGLE_SLASH +
                    taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH  +
                    taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH  +
                    taskExecutionContext.getTaskInstanceId() + ".log";
        }
        return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
                baseLog +  Constants.SINGLE_SLASH +
                taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH  +
                taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH  +
                taskExecutionContext.getTaskInstanceId() + ".log";
    }

    /**
     * build ack command
     * @param taskExecutionContext taskExecutionContext
T
Tboy 已提交
171
     * @return TaskExecuteAckCommand
T
Tboy 已提交
172
     */
T
Tboy 已提交
173 174
    private TaskExecuteAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
        TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
T
Tboy 已提交
175 176 177
        ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
        ackCommand.setLogPath(getTaskLogPath(taskExecutionContext));
Q
qiaozhanwei 已提交
178
        ackCommand.setHost(taskExecutionContext.getHost());
T
Tboy 已提交
179 180 181 182 183 184
        ackCommand.setStartTime(new Date());
        if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
            ackCommand.setExecutePath(null);
        }else{
            ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
        }
185
        taskExecutionContext.setLogPath(ackCommand.getLogPath());
T
Tboy 已提交
186 187
        return ackCommand;
    }
T
Tboy 已提交
188

Q
qiaozhanwei 已提交
189
    /**
190
     * get execute local path
T
Tboy 已提交
191
     * @param taskExecutionContext taskExecutionContext
Q
qiaozhanwei 已提交
192 193
     * @return execute local path
     */
T
Tboy 已提交
194 195 196 197
    private String getExecLocalPath(TaskExecutionContext taskExecutionContext){
        return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
                taskExecutionContext.getProcessDefineId(),
                taskExecutionContext.getProcessInstanceId(),
T
Tboy 已提交
198
                taskExecutionContext.getTaskInstanceId());
T
Tboy 已提交
199 200
    }
}