MasterBaseTaskExecThread.java 10.0 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Q
qiaozhanwei 已提交
17
package org.apache.dolphinscheduler.server.master.runner;
L
ligang 已提交
18

T
Technoboy- 已提交
19
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
T
Tboy 已提交
20 21
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
Q
qiaozhanwei 已提交
22
import org.apache.dolphinscheduler.dao.AlertDao;
T
Tboy 已提交
23
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
Q
qiaozhanwei 已提交
24 25
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
T
Tboy 已提交
26
import org.apache.dolphinscheduler.dao.entity.Tenant;
Q
qiaozhanwei 已提交
27
import org.apache.dolphinscheduler.dao.utils.BeanContext;
Q
qiaozhanwei 已提交
28
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
T
Technoboy- 已提交
29 30 31
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
T
updates  
Technoboy- 已提交
32
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
Q
qiaozhanwei 已提交
33 34 35
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Address;
Q
qiaozhanwei 已提交
36
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
T
Tboy 已提交
37
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
B
bao liang 已提交
38
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
39 40 41 42
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
L
ligang 已提交
43 44 45 46 47 48 49 50 51 52
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Callable;

/**
 * master task exec base class
 */
public class MasterBaseTaskExecThread implements Callable<Boolean> {

53 54 55
    /**
     * logger of MasterBaseTaskExecThread
     */
L
ligang 已提交
56 57 58
    private static final Logger logger = LoggerFactory.getLogger(MasterBaseTaskExecThread.class);

    /**
59
     * process service
L
ligang 已提交
60
     */
61
    protected ProcessService processService;
L
ligang 已提交
62 63

    /**
64
     * alert database access
L
ligang 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78
     */
    protected AlertDao alertDao;

    /**
     * process instance
     */
    protected ProcessInstance processInstance;

    /**
     * task instance
     */
    protected TaskInstance taskInstance;

    /**
79
     * task queue
L
ligang 已提交
80 81
     */
    protected ITaskQueue taskQueue;
82 83 84 85

    /**
     * whether need cancel
     */
L
ligang 已提交
86 87 88
    protected boolean cancel;

    /**
B
bao liang 已提交
89
     * master config
L
ligang 已提交
90
     */
B
bao liang 已提交
91
    private MasterConfig masterConfig;
L
ligang 已提交
92

Q
qiaozhanwei 已提交
93 94 95 96

    /**
     *  netty remoting client
     */
Q
qiaozhanwei 已提交
97
    private static final NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(new NettyClientConfig());
Q
qiaozhanwei 已提交
98

99 100 101 102 103
    /**
     * constructor of MasterBaseTaskExecThread
     * @param taskInstance      task instance
     * @param processInstance   process instance
     */
L
ligang 已提交
104
    public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
105
        this.processService = BeanContext.getBean(ProcessService.class);
L
ligang 已提交
106 107 108 109 110
        this.alertDao = BeanContext.getBean(AlertDao.class);
        this.processInstance = processInstance;
        this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
        this.cancel = false;
        this.taskInstance = taskInstance;
B
bao liang 已提交
111
        this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
L
ligang 已提交
112 113
    }

114 115 116 117
    /**
     * get task instance
     * @return TaskInstance
     */
L
ligang 已提交
118 119 120 121
    public TaskInstance getTaskInstance(){
        return this.taskInstance;
    }

122 123 124
    /**
     * kill master base task exec thread
     */
L
ligang 已提交
125 126 127 128
    public void kill(){
        this.cancel = true;
    }

Q
qiaozhanwei 已提交
129

Q
qiaozhanwei 已提交
130
    // TODO send task to worker
T
Technoboy- 已提交
131
    public void sendToWorker(TaskInstance taskInstance){
Q
qiaozhanwei 已提交
132
        final Address address = new Address("127.0.0.1", 12346);
T
Tboy 已提交
133 134

        ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(
T
Tboy 已提交
135
                FastJsonSerializer.serializeToString(getTaskExecutionContext(taskInstance)));
Q
qiaozhanwei 已提交
136
        try {
T
Tboy 已提交
137
            Command responseCommand = nettyRemotingClient.sendSync(address,
T
updates  
Technoboy- 已提交
138
                    taskRequestCommand.convert2Command(), 2000);
T
Tboy 已提交
139 140 141 142

            ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(
                    responseCommand.getBody(), ExecuteTaskAckCommand.class);

T
Technoboy- 已提交
143 144 145 146 147 148 149 150
            logger.info("taskAckCommand : {}",taskAckCommand);
            processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
                    taskAckCommand.getStartTime(),
                    taskAckCommand.getHost(),
                    taskAckCommand.getExecutePath(),
                    taskAckCommand.getLogPath(),
                    taskInstance.getId());

Q
qiaozhanwei 已提交
151 152 153 154 155
        } catch (InterruptedException | RemotingException ex) {
            logger.error(String.format("send command to : %s error", address), ex);
        }
    }

T
Tboy 已提交
156
    /**
T
Tboy 已提交
157
     * get TaskExecutionContext
T
Tboy 已提交
158 159
     *
     * @param taskInstance taskInstance
T
Tboy 已提交
160
     * @return TaskExecutionContext
T
Tboy 已提交
161
     */
T
Tboy 已提交
162
    private TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance){
T
Tboy 已提交
163 164
        taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId());

T
Tboy 已提交
165
        Integer userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
T
Tboy 已提交
166
        Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
T
Tboy 已提交
167

T
Tboy 已提交
168 169
        // verify tenant is null
        if (verifyTenantIsNull(tenant, taskInstance)) {
T
Tboy 已提交
170 171 172 173 174 175
            processService.changeTaskState(ExecutionStatus.FAILURE,
                    taskInstance.getStartTime(),
                    taskInstance.getHost(),
                    null,
                    null,
                    taskInstance.getId());
T
Tboy 已提交
176 177 178 179 180 181 182
            return null;
        }
        // set queue for process instance, user-specified queue takes precedence over tenant queue
        String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
        taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
        taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());

T
Tboy 已提交
183 184 185 186 187
        return TaskExecutionContextBuilder.get()
                .buildTaskInstanceRelatedInfo(taskInstance)
                .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
                .buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
                .create();
T
Tboy 已提交
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
    }


    /**
     *  whehter tenant is null
     * @param tenant tenant
     * @param taskInstance taskInstance
     * @return result
     */
    private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
        if(tenant == null){
            logger.error("tenant not exists,process instance id : {},task instance id : {}",
                    taskInstance.getProcessInstance().getId(),
                    taskInstance.getId());
            return true;
        }
        return false;
    }

    /**
     * get execute local path
     *
     * @return execute local path
     */
    private String getExecLocalPath(TaskInstance taskInstance){
        return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
                taskInstance.getProcessDefine().getId(),
                taskInstance.getProcessInstance().getId(),
                taskInstance.getId());
    }

219 220 221 222
    /**
     * submit master base task exec thread
     * @return TaskInstance
     */
L
ligang 已提交
223
    protected TaskInstance submit(){
B
bao liang 已提交
224 225
        Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes();
        Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval();
L
ligang 已提交
226 227

        int retryTimes = 1;
228 229
        boolean submitDB = false;
        boolean submitQueue = false;
230
        TaskInstance task = null;
231
        while (retryTimes <= commitRetryTimes){
L
ligang 已提交
232
            try {
233
                if(!submitDB){
234
                    // submit task to db
235
                    task = processService.submitTask(taskInstance, processInstance);
236
                    if(task != null && task.getId() != 0){
237
                        submitDB = true;
238 239
                    }
                }
240
                if(submitDB && !submitQueue){
241
                    // submit task to queue
T
Technoboy- 已提交
242
                    sendToWorker(task);
Q
qiaozhanwei 已提交
243
                    submitQueue = true;
244
                }
245
                if(submitDB && submitQueue){
L
ligang 已提交
246 247
                    return task;
                }
248 249 250 251
                if(!submitDB){
                    logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
                }else if(!submitQueue){
                    logger.error("task commit to queue failed , taskId {} has already retry {} times, please check the queue", taskInstance.getId(), retryTimes);
252
                }
L
ligang 已提交
253 254
                Thread.sleep(commitRetryInterval);
            } catch (Exception e) {
Y
Yelli 已提交
255
                logger.error("task commit to mysql and queue failed",e);
L
ligang 已提交
256 257 258
            }
            retryTimes += 1;
        }
259
        return task;
L
ligang 已提交
260 261
    }

262 263 264 265
    /**
     * submit wait complete
     * @return true
     */
L
ligang 已提交
266 267 268 269
    protected Boolean submitWaitComplete(){
        return true;
    }

270 271 272 273 274
    /**
     * call
     * @return boolean
     * @throws Exception exception
     */
L
ligang 已提交
275 276 277 278 279 280
    @Override
    public Boolean call() throws Exception {
        return submitWaitComplete();
    }

}