MasterBaseTaskExecThread.java 4.4 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Q
qiaozhanwei 已提交
17
package org.apache.dolphinscheduler.server.master.runner;
L
ligang 已提交
18

Q
qiaozhanwei 已提交
19 20 21 22 23 24 25
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.queue.TaskQueueFactory;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.BeanContext;
B
bao liang 已提交
26 27
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
L
ligang 已提交
28 29 30 31 32 33 34 35 36 37
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Callable;

/**
 * master task exec base class
 */
public class MasterBaseTaskExecThread implements Callable<Boolean> {

38 39 40
    /**
     * logger of MasterBaseTaskExecThread
     */
L
ligang 已提交
41 42 43
    private static final Logger logger = LoggerFactory.getLogger(MasterBaseTaskExecThread.class);

    /**
44
     * process dao
L
ligang 已提交
45 46 47 48
     */
    protected ProcessDao processDao;

    /**
49
     * alert database access
L
ligang 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63
     */
    protected AlertDao alertDao;

    /**
     * process instance
     */
    protected ProcessInstance processInstance;

    /**
     * task instance
     */
    protected TaskInstance taskInstance;

    /**
64
     * task queue
L
ligang 已提交
65 66
     */
    protected ITaskQueue taskQueue;
67 68 69 70

    /**
     * whether need cancel
     */
L
ligang 已提交
71 72 73
    protected boolean cancel;

    /**
B
bao liang 已提交
74
     * master config
L
ligang 已提交
75
     */
B
bao liang 已提交
76
    private MasterConfig masterConfig;
L
ligang 已提交
77

78 79 80 81 82
    /**
     * constructor of MasterBaseTaskExecThread
     * @param taskInstance      task instance
     * @param processInstance   process instance
     */
L
ligang 已提交
83 84 85 86 87 88 89
    public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
        this.processDao = BeanContext.getBean(ProcessDao.class);
        this.alertDao = BeanContext.getBean(AlertDao.class);
        this.processInstance = processInstance;
        this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
        this.cancel = false;
        this.taskInstance = taskInstance;
B
bao liang 已提交
90
        this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
L
ligang 已提交
91 92
    }

93 94 95 96
    /**
     * get task instance
     * @return TaskInstance
     */
L
ligang 已提交
97 98 99 100
    public TaskInstance getTaskInstance(){
        return this.taskInstance;
    }

101 102 103
    /**
     * kill master base task exec thread
     */
L
ligang 已提交
104 105 106 107
    public void kill(){
        this.cancel = true;
    }

108 109 110 111
    /**
     * submit master base task exec thread
     * @return TaskInstance
     */
L
ligang 已提交
112
    protected TaskInstance submit(){
B
bao liang 已提交
113 114
        Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes();
        Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval();
L
ligang 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133

        int retryTimes = 1;

        while (retryTimes <= commitRetryTimes){
            try {
                TaskInstance task = processDao.submitTask(taskInstance, processInstance);
                if(task != null){
                    return task;
                }
                logger.error("task commit to mysql and queue failed , task has already retry {} times, please check the database", commitRetryTimes);
                Thread.sleep(commitRetryInterval);
            } catch (Exception e) {
                logger.error("task commit to mysql and queue failed : " + e.getMessage(),e);
            }
            retryTimes += 1;
        }
        return null;
    }

134 135 136 137
    /**
     * submit wait complete
     * @return true
     */
L
ligang 已提交
138 139 140 141
    protected Boolean submitWaitComplete(){
        return true;
    }

142 143 144 145 146
    /**
     * call
     * @return boolean
     * @throws Exception exception
     */
L
ligang 已提交
147 148 149 150 151 152
    @Override
    public Boolean call() throws Exception {
        return submitWaitComplete();
    }

}