AlertManager.java 9.8 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Q
qiaozhanwei 已提交
17

18
package org.apache.dolphinscheduler.server.utils;
Q
qiaozhanwei 已提交
19 20 21 22 23

import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.apache.dolphinscheduler.common.enums.WarningType;
24
import org.apache.dolphinscheduler.common.utils.JSONUtils;
Q
qiaozhanwei 已提交
25 26 27
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.entity.Alert;
28
import org.apache.dolphinscheduler.dao.entity.ProcessAlertContent;
Q
qiaozhanwei 已提交
29 30 31
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
L
ligang 已提交
32 33 34 35 36

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

37 38 39
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

L
ligang 已提交
40 41 42 43 44
/**
 * alert manager
 */
public class AlertManager {

45 46 47
    /**
     * logger of AlertManager
     */
L
ligang 已提交
48 49
    private static final Logger logger = LoggerFactory.getLogger(AlertManager.class);

50 51 52
    /**
     * alert dao
     */
53
    private final AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class);
L
ligang 已提交
54 55

    /**
56 57 58 59
     * command type convert chinese
     *
     * @param commandType command type
     * @return command name
L
ligang 已提交
60 61 62 63
     */
    private String getCommandCnName(CommandType commandType) {
        switch (commandType) {
            case RECOVER_TOLERANCE_FAULT_PROCESS:
B
baoliang 已提交
64
                return "recover tolerance fault process";
L
ligang 已提交
65
            case RECOVER_SUSPENDED_PROCESS:
B
baoliang 已提交
66
                return "recover suspended process";
L
ligang 已提交
67
            case START_CURRENT_TASK_PROCESS:
B
baoliang 已提交
68
                return "start current task process";
L
ligang 已提交
69
            case START_FAILURE_TASK_PROCESS:
B
baoliang 已提交
70
                return "start failure task process";
L
ligang 已提交
71
            case START_PROCESS:
B
baoliang 已提交
72
                return "start process";
L
ligang 已提交
73
            case REPEAT_RUNNING:
B
baoliang 已提交
74
                return "repeat running";
L
ligang 已提交
75
            case SCHEDULER:
B
baoliang 已提交
76
                return "scheduler";
L
ligang 已提交
77
            case COMPLEMENT_DATA:
B
baoliang 已提交
78
                return "complement data";
L
ligang 已提交
79
            case PAUSE:
B
baoliang 已提交
80
                return "pause";
L
ligang 已提交
81
            case STOP:
B
baoliang 已提交
82
                return "stop";
L
ligang 已提交
83
            default:
B
baoliang 已提交
84
                return "unknown type";
L
ligang 已提交
85 86 87 88
        }
    }

    /**
89
     * get process instance content
90 91 92
     *
     * @param processInstance process instance
     * @param taskInstances task instance list
93
     * @return process instance format content
L
ligang 已提交
94 95
     */
    public String getContentProcessInstance(ProcessInstance processInstance,
96
                                            List<TaskInstance> taskInstances) {
L
ligang 已提交
97 98

        String res = "";
99
        if (processInstance.getState().typeIsSuccess()) {
100 101 102 103 104 105 106 107 108 109 110 111 112
            List<ProcessAlertContent> successTaskList = new ArrayList<>(1);
            ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
                    .processId(processInstance.getId())
                    .processName(processInstance.getName())
                    .processType(processInstance.getCommandType())
                    .processState(processInstance.getState())
                    .recovery(processInstance.getRecovery())
                    .runTimes(processInstance.getRunTimes())
                    .processStartTime(processInstance.getStartTime())
                    .processEndTime(processInstance.getEndTime())
                    .processHost(processInstance.getHost())
                    .build();
            successTaskList.add(processAlertContent);
113 114
            res = JSONUtils.toJsonString(successTaskList);
        } else if (processInstance.getState().typeIsFailure()) {
L
ligang 已提交
115

116
            List<ProcessAlertContent> failedTaskList = new ArrayList<>();
117 118
            for (TaskInstance task : taskInstances) {
                if (task.getState().typeIsSuccess()) {
L
ligang 已提交
119 120
                    continue;
                }
121 122 123 124 125 126 127 128 129 130 131 132 133
                ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
                        .processId(processInstance.getId())
                        .processName(processInstance.getName())
                        .taskId(task.getId())
                        .taskName(task.getName())
                        .taskType(task.getTaskType())
                        .taskState(task.getState())
                        .taskStartTime(task.getStartTime())
                        .taskEndTime(task.getEndTime())
                        .taskHost(task.getHost())
                        .logPath(task.getLogPath())
                        .build();
                failedTaskList.add(processAlertContent);
L
ligang 已提交
134
            }
张世鸣 已提交
135
            res = JSONUtils.toJsonString(failedTaskList);
L
ligang 已提交
136 137 138 139 140 141
        }

        return res;
    }

    /**
142 143
     * getting worker fault tolerant content
     *
144
     * @param processInstance process instance
145 146
     * @param toleranceTaskList tolerance task list
     * @return worker tolerance content
L
ligang 已提交
147
     */
148
    private String getWorkerToleranceContent(ProcessInstance processInstance, List<TaskInstance> toleranceTaskList) {
L
ligang 已提交
149

150
        List<ProcessAlertContent> toleranceTaskInstanceList = new ArrayList<>();
L
ligang 已提交
151

152
        for (TaskInstance taskInstance : toleranceTaskList) {
153 154 155 156 157 158 159
            ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
                    .processName(processInstance.getName())
                    .taskName(taskInstance.getName())
                    .taskHost(taskInstance.getHost())
                    .retryTimes(taskInstance.getRetryTimes())
                    .build();
            toleranceTaskInstanceList.add(processAlertContent);
L
ligang 已提交
160
        }
张世鸣 已提交
161
        return JSONUtils.toJsonString(toleranceTaskInstanceList);
L
ligang 已提交
162 163 164 165
    }

    /**
     * send worker alert fault tolerance
166
     *
167
     * @param processInstance process instance
168
     * @param toleranceTaskList tolerance task list
L
ligang 已提交
169
     */
170 171
    public void sendAlertWorkerToleranceFault(ProcessInstance processInstance, List<TaskInstance> toleranceTaskList) {
        try {
172 173 174 175 176 177 178
            Alert alert = new Alert();
            alert.setTitle("worker fault tolerance");
            alert.setShowType(ShowType.TABLE);
            String content = getWorkerToleranceContent(processInstance, toleranceTaskList);
            alert.setContent(content);
            alert.setAlertType(AlertType.EMAIL);
            alert.setCreateTime(new Date());
179
            alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1 : processInstance.getWarningGroupId());
180 181 182 183 184
            alert.setReceivers(processInstance.getProcessDefinition().getReceivers());
            alert.setReceiversCc(processInstance.getProcessDefinition().getReceiversCc());
            alertDao.addAlert(alert);
            logger.info("add alert to db , alert : {}", alert.toString());

185
        } catch (Exception e) {
186
            logger.error("send alert failed:{} ", e.getMessage());
187
        }
L
ligang 已提交
188 189 190 191 192

    }

    /**
     * send process instance alert
193 194 195
     *
     * @param processInstance process instance
     * @param taskInstances task instance list
L
ligang 已提交
196
     */
B
baoliang 已提交
197
    public void sendAlertProcessInstance(ProcessInstance processInstance,
198
                                         List<TaskInstance> taskInstances) {
L
ligang 已提交
199 200 201

        boolean sendWarnning = false;
        WarningType warningType = processInstance.getWarningType();
202
        switch (warningType) {
L
ligang 已提交
203
            case ALL:
204
                if (processInstance.getState().typeIsFinished()) {
L
ligang 已提交
205 206 207 208
                    sendWarnning = true;
                }
                break;
            case SUCCESS:
209
                if (processInstance.getState().typeIsSuccess()) {
L
ligang 已提交
210 211 212 213
                    sendWarnning = true;
                }
                break;
            case FAILURE:
214
                if (processInstance.getState().typeIsFailure()) {
L
ligang 已提交
215 216 217
                    sendWarnning = true;
                }
                break;
218
            default:
L
ligang 已提交
219
        }
220
        if (!sendWarnning) {
L
ligang 已提交
221 222 223 224 225
            return;
        }
        Alert alert = new Alert();

        String cmdName = getCommandCnName(processInstance.getCommandType());
226
        String success = processInstance.getState().typeIsSuccess() ? "success" : "failed";
J
'#1595'  
JinyLeeChina 已提交
227
        alert.setTitle(cmdName + " " + success);
L
ligang 已提交
228 229 230 231 232 233 234 235 236 237 238 239 240 241
        ShowType showType = processInstance.getState().typeIsSuccess() ? ShowType.TEXT : ShowType.TABLE;
        alert.setShowType(showType);
        String content = getContentProcessInstance(processInstance, taskInstances);
        alert.setContent(content);
        alert.setAlertType(AlertType.EMAIL);
        alert.setAlertGroupId(processInstance.getWarningGroupId());
        alert.setCreateTime(new Date());
        alert.setReceivers(processInstance.getProcessDefinition().getReceivers());
        alert.setReceiversCc(processInstance.getProcessDefinition().getReceiversCc());

        alertDao.addAlert(alert);
        logger.info("add alert to db , alert: {}", alert.toString());
    }

242 243 244
    /**
     * send process timeout alert
     *
245
     * @param processInstance process instance
246 247
     * @param processDefinition process definition
     */
B
baoliang 已提交
248 249 250
    public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition) {
        alertDao.sendProcessTimeoutAlert(processInstance, processDefinition);
    }
L
ligang 已提交
251
}