“374410aa846a9c2a168c45dd81801767f7d9789d”上不存在“docs/api/zh/git@gitcode.net:qq_39816586/three.js.git”
MasterBaseTaskExecThread.java 5.2 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Q
qiaozhanwei 已提交
17
package org.apache.dolphinscheduler.server.master.runner;
L
ligang 已提交
18

Q
qiaozhanwei 已提交
19 20
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.queue.TaskQueueFactory;
D
dk.technoboy 已提交
21
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
Q
qiaozhanwei 已提交
22 23 24 25 26
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.BeanContext;
B
bao liang 已提交
27
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
L
ligang 已提交
28 29 30 31 32 33 34 35 36 37
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Callable;

/**
 * master task exec base class
 */
public class MasterBaseTaskExecThread implements Callable<Boolean> {

38 39 40
    /**
     * logger of MasterBaseTaskExecThread
     */
L
ligang 已提交
41 42 43
    private static final Logger logger = LoggerFactory.getLogger(MasterBaseTaskExecThread.class);

    /**
44
     * process dao
L
ligang 已提交
45 46 47 48
     */
    protected ProcessDao processDao;

    /**
49
     * alert database access
L
ligang 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63
     */
    protected AlertDao alertDao;

    /**
     * process instance
     */
    protected ProcessInstance processInstance;

    /**
     * task instance
     */
    protected TaskInstance taskInstance;

    /**
64
     * task queue
L
ligang 已提交
65 66
     */
    protected ITaskQueue taskQueue;
67 68 69 70

    /**
     * whether need cancel
     */
L
ligang 已提交
71 72 73
    protected boolean cancel;

    /**
B
bao liang 已提交
74
     * master config
L
ligang 已提交
75
     */
B
bao liang 已提交
76
    private MasterConfig masterConfig;
L
ligang 已提交
77

78 79 80 81 82
    /**
     * constructor of MasterBaseTaskExecThread
     * @param taskInstance      task instance
     * @param processInstance   process instance
     */
L
ligang 已提交
83 84 85 86 87 88 89
    public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
        this.processDao = BeanContext.getBean(ProcessDao.class);
        this.alertDao = BeanContext.getBean(AlertDao.class);
        this.processInstance = processInstance;
        this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
        this.cancel = false;
        this.taskInstance = taskInstance;
B
bao liang 已提交
90
        this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
L
ligang 已提交
91 92
    }

93 94 95 96
    /**
     * get task instance
     * @return TaskInstance
     */
L
ligang 已提交
97 98 99 100
    public TaskInstance getTaskInstance(){
        return this.taskInstance;
    }

101 102 103
    /**
     * kill master base task exec thread
     */
L
ligang 已提交
104 105 106 107
    public void kill(){
        this.cancel = true;
    }

108 109 110 111
    /**
     * submit master base task exec thread
     * @return TaskInstance
     */
L
ligang 已提交
112
    protected TaskInstance submit(){
B
bao liang 已提交
113 114
        Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes();
        Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval();
L
ligang 已提交
115 116

        int retryTimes = 1;
117 118
        boolean submitDB = false;
        boolean submitQueue = false;
119
        TaskInstance task = null;
120
        while (retryTimes <= commitRetryTimes){
L
ligang 已提交
121
            try {
122
                if(!submitDB){
123 124 125
                    // submit task to db
                    task = processDao.submitTask(taskInstance, processInstance);
                    if(task != null && task.getId() != 0){
126
                        submitDB = true;
127 128
                    }
                }
129
                if(submitDB && !submitQueue){
130
                    // submit task to queue
131
                    submitQueue = processDao.submitTaskToQueue(task);
132
                }
133
                if(submitDB && submitQueue){
L
ligang 已提交
134 135
                    return task;
                }
136
                if(!submitDB){
T
add log  
Technoboy- 已提交
137
                    logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
T
updates  
Technoboy- 已提交
138
                }else if(!submitQueue){
T
add log  
Technoboy- 已提交
139
                    logger.error("task commit to queue failed , taskId {} has already retry {} times, please check the queue", taskInstance.getId(), retryTimes);
140
                }
L
ligang 已提交
141 142 143 144 145 146
                Thread.sleep(commitRetryInterval);
            } catch (Exception e) {
                logger.error("task commit to mysql and queue failed : " + e.getMessage(),e);
            }
            retryTimes += 1;
        }
147
        return task;
L
ligang 已提交
148 149
    }

150 151 152 153
    /**
     * submit wait complete
     * @return true
     */
L
ligang 已提交
154 155 156 157
    protected Boolean submitWaitComplete(){
        return true;
    }

158 159 160 161 162
    /**
     * call
     * @return boolean
     * @throws Exception exception
     */
L
ligang 已提交
163 164 165 166 167 168
    @Override
    public Boolean call() throws Exception {
        return submitWaitComplete();
    }

}