未验证 提交 8e21c38c 编写于 作者: W Wenjun Ruan 提交者: GitHub

Write alert result into db (#11221)

上级 01fc6c8a
...@@ -25,8 +25,13 @@ package org.apache.dolphinscheduler.alert.api; ...@@ -25,8 +25,13 @@ package org.apache.dolphinscheduler.alert.api;
public interface AlertChannel { public interface AlertChannel {
/** /**
* process and send alert * process and send alert
*
* @param info alert info * @param info alert info
* @return process alarm result * @return process alarm result
*/ */
AlertResult process(AlertInfo info); AlertResult process(AlertInfo info);
default AlertResult closeAlert(AlertInfo info) {
return new AlertResult("true", "no need to close alert");
}
} }
...@@ -61,4 +61,9 @@ public class AlertData { ...@@ -61,4 +61,9 @@ public class AlertData {
*/ */
private int warnType; private int warnType;
/**
* AlertType#code
*/
private int alertType;
} }
...@@ -20,24 +20,27 @@ ...@@ -20,24 +20,27 @@
package org.apache.dolphinscheduler.alert.api; package org.apache.dolphinscheduler.alert.api;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
/** /**
* alert result * alert result
*/ */
@Builder
@AllArgsConstructor @AllArgsConstructor
@Data @Data
@NoArgsConstructor @NoArgsConstructor
public class AlertResult { public class AlertResult {
/** /**
* todo: use enum
* false or true * false or true
*/ */
private String status; private String status;
/** /**
* alert result message * alert result message, each plugin can have its own message
*/ */
private String message; private String message;
......
...@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.alert.api.AlertInfo; ...@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.alert.api.AlertInfo;
import org.apache.dolphinscheduler.alert.api.AlertResult; import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AlertStatus; import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
...@@ -43,10 +44,14 @@ import java.util.Optional; ...@@ -43,10 +44,14 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.google.common.collect.Lists;
@Service @Service
public final class AlertSenderService extends Thread { public final class AlertSenderService extends Thread {
private static final Logger logger = LoggerFactory.getLogger(AlertSenderService.class); private static final Logger logger = LoggerFactory.getLogger(AlertSenderService.class);
...@@ -89,7 +94,9 @@ public final class AlertSenderService extends Thread { ...@@ -89,7 +94,9 @@ public final class AlertSenderService extends Thread {
List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId); List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
if (CollectionUtils.isEmpty(alertInstanceList)) { if (CollectionUtils.isEmpty(alertInstanceList)) {
logger.error("send alert msg fail,no bind plugin instance."); logger.error("send alert msg fail,no bind plugin instance.");
alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "no bind plugin instance", alertId); List<AlertResult> alertResults = Lists.newArrayList(new AlertResult("false",
"no bind plugin instance"));
alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, JSONUtils.toJsonString(alertResults), alertId);
continue; continue;
} }
AlertData alertData = AlertData.builder() AlertData alertData = AlertData.builder()
...@@ -98,17 +105,20 @@ public final class AlertSenderService extends Thread { ...@@ -98,17 +105,20 @@ public final class AlertSenderService extends Thread {
.log(alert.getLog()) .log(alert.getLog())
.title(alert.getTitle()) .title(alert.getTitle())
.warnType(alert.getWarningType().getCode()) .warnType(alert.getWarningType().getCode())
.alertType(alert.getAlertType().getCode())
.build(); .build();
int sendSuccessCount = 0; int sendSuccessCount = 0;
List<AlertResult> alertResults = new ArrayList<>();
for (AlertPluginInstance instance : alertInstanceList) { for (AlertPluginInstance instance : alertInstanceList) {
AlertResult alertResult = this.alertResultHandler(instance, alertData); AlertResult alertResult = this.alertResultHandler(instance, alertData);
if (alertResult != null) { if (alertResult != null) {
AlertStatus sendStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus())) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE; AlertStatus sendStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus())) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE;
alertDao.addAlertSendStatus(sendStatus,alertResult.getMessage(),alertId,instance.getId()); alertDao.addAlertSendStatus(sendStatus, JSONUtils.toJsonString(alertResult), alertId, instance.getId());
if (sendStatus.equals(AlertStatus.EXECUTION_SUCCESS)) { if (sendStatus.equals(AlertStatus.EXECUTION_SUCCESS)) {
sendSuccessCount++; sendSuccessCount++;
} }
alertResults.add(alertResult);
} }
} }
AlertStatus alertStatus = AlertStatus.EXECUTION_SUCCESS; AlertStatus alertStatus = AlertStatus.EXECUTION_SUCCESS;
...@@ -117,7 +127,7 @@ public final class AlertSenderService extends Thread { ...@@ -117,7 +127,7 @@ public final class AlertSenderService extends Thread {
} else if (sendSuccessCount < alertInstanceList.size()) { } else if (sendSuccessCount < alertInstanceList.size()) {
alertStatus = AlertStatus.EXECUTION_PARTIAL_SUCCESS; alertStatus = AlertStatus.EXECUTION_PARTIAL_SUCCESS;
} }
alertDao.updateAlert(alertStatus, "", alertId); alertDao.updateAlert(alertStatus, JSONUtils.toJsonString(alertResults), alertId);
} }
} }
...@@ -170,17 +180,18 @@ public final class AlertSenderService extends Thread { ...@@ -170,17 +180,18 @@ public final class AlertSenderService extends Thread {
* @param alertData alertData * @param alertData alertData
* @return AlertResult * @return AlertResult
*/ */
private AlertResult alertResultHandler(AlertPluginInstance instance, AlertData alertData) { private @Nullable AlertResult alertResultHandler(AlertPluginInstance instance, AlertData alertData) {
Optional<AlertChannel> alertChannel = alertPluginManager.getAlertChannel(instance.getPluginDefineId());
AlertResult alertResultExtend = new AlertResult();
String pluginInstanceName = instance.getInstanceName(); String pluginInstanceName = instance.getInstanceName();
if (!alertChannel.isPresent()) { int pluginDefineId = instance.getPluginDefineId();
String message = String.format("Alert Plugin %s send error : return value is null", pluginInstanceName); Optional<AlertChannel> alertChannelOptional = alertPluginManager.getAlertChannel(instance.getPluginDefineId());
alertResultExtend.setStatus(String.valueOf(false)); if (!alertChannelOptional.isPresent()) {
alertResultExtend.setMessage(message); String message = String.format("Alert Plugin %s send error: the channel doesn't exist, pluginDefineId: %s",
logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, instance.getPluginDefineId()); pluginInstanceName,
return alertResultExtend; pluginDefineId);
logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginDefineId);
return new AlertResult("false", message);
} }
AlertChannel alertChannel = alertChannelOptional.get();
Map<String, String> paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams()); Map<String, String> paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams());
String instanceWarnType = WarningType.ALL.getDescp(); String instanceWarnType = WarningType.ALL.getDescp();
...@@ -193,10 +204,8 @@ public final class AlertSenderService extends Thread { ...@@ -193,10 +204,8 @@ public final class AlertSenderService extends Thread {
if (warningType == null) { if (warningType == null) {
String message = String.format("Alert Plugin %s send error : plugin warnType is null", pluginInstanceName); String message = String.format("Alert Plugin %s send error : plugin warnType is null", pluginInstanceName);
alertResultExtend.setStatus(String.valueOf(false));
alertResultExtend.setMessage(message);
logger.error("Alert Plugin {} send error : plugin warnType is null", pluginInstanceName); logger.error("Alert Plugin {} send error : plugin warnType is null", pluginInstanceName);
return alertResultExtend; return new AlertResult("false", message);
} }
boolean sendWarning = false; boolean sendWarning = false;
...@@ -231,10 +240,18 @@ public final class AlertSenderService extends Thread { ...@@ -231,10 +240,18 @@ public final class AlertSenderService extends Thread {
AlertResult alertResult; AlertResult alertResult;
try { try {
if (waitTimeout <= 0) { if (waitTimeout <= 0) {
alertResult = alertChannel.get().process(alertInfo); if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) {
alertResult = alertChannel.closeAlert(alertInfo);
} else {
alertResult = alertChannel.process(alertInfo);
}
} else { } else {
CompletableFuture<AlertResult> future = CompletableFuture<AlertResult> future;
CompletableFuture.supplyAsync(() -> alertChannel.get().process(alertInfo)); if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) {
future = CompletableFuture.supplyAsync(() -> alertChannel.closeAlert(alertInfo));
} else {
future = CompletableFuture.supplyAsync(() -> alertChannel.process(alertInfo));
}
alertResult = future.get(waitTimeout, TimeUnit.MILLISECONDS); alertResult = future.get(waitTimeout, TimeUnit.MILLISECONDS);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
...@@ -246,18 +263,19 @@ public final class AlertSenderService extends Thread { ...@@ -246,18 +263,19 @@ public final class AlertSenderService extends Thread {
logger.error("send alert error alert data id :{},", alertData.getId(), e); logger.error("send alert error alert data id :{},", alertData.getId(), e);
} }
AlertResult alertResultExtend = new AlertResult();
if (alertResult == null) { if (alertResult == null) {
String message = String.format("Alert Plugin %s send error : return alertResult value is null", pluginInstanceName); String message = String.format("Alert Plugin %s send error : return alertResult value is null", pluginInstanceName);
alertResultExtend.setStatus(String.valueOf(false)); alertResultExtend.setStatus("false");
alertResultExtend.setMessage(message); alertResultExtend.setMessage(message);
logger.info("Alert Plugin {} send error : return alertResult value is null", pluginInstanceName); logger.info("Alert Plugin {} send error : return alertResult value is null", pluginInstanceName);
} else if (!Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))) { } else if (!Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))) {
alertResultExtend.setStatus(String.valueOf(false)); alertResultExtend.setStatus("false");
alertResultExtend.setMessage(alertResult.getMessage()); alertResultExtend.setMessage(alertResult.getMessage());
logger.info("Alert Plugin {} send error : {}", pluginInstanceName, alertResult.getMessage()); logger.info("Alert Plugin {} send error : {}", pluginInstanceName, alertResult.getMessage());
} else { } else {
String message = String.format("Alert Plugin %s send success", pluginInstanceName); String message = String.format("Alert Plugin %s send success", pluginInstanceName);
alertResultExtend.setStatus(String.valueOf(true)); alertResultExtend.setStatus("true");
alertResultExtend.setMessage(message); alertResultExtend.setMessage(message);
logger.info("Alert Plugin {} send success", pluginInstanceName); logger.info("Alert Plugin {} send success", pluginInstanceName);
} }
......
...@@ -25,7 +25,7 @@ import com.baomidou.mybatisplus.annotation.EnumValue; ...@@ -25,7 +25,7 @@ import com.baomidou.mybatisplus.annotation.EnumValue;
public enum AlertType { public enum AlertType {
/** /**
* 0 process instance failure, 1 process instance success, 2 process instance blocked, 3 process instance timeout, 4 fault tolerance warning, * 0 process instance failure, 1 process instance success, 2 process instance blocked, 3 process instance timeout, 4 fault tolerance warning,
* 5 task failure, 6 task success, 7 task timeout * 5 task failure, 6 task success, 7 task timeout, 8 close alert
*/ */
PROCESS_INSTANCE_FAILURE(0, "process instance failure"), PROCESS_INSTANCE_FAILURE(0, "process instance failure"),
PROCESS_INSTANCE_SUCCESS(1, "process instance success"), PROCESS_INSTANCE_SUCCESS(1, "process instance success"),
...@@ -34,7 +34,10 @@ public enum AlertType { ...@@ -34,7 +34,10 @@ public enum AlertType {
FAULT_TOLERANCE_WARNING(4, "fault tolerance warning"), FAULT_TOLERANCE_WARNING(4, "fault tolerance warning"),
TASK_FAILURE(5, "task failure"), TASK_FAILURE(5, "task failure"),
TASK_SUCCESS(6, "task success"), TASK_SUCCESS(6, "task success"),
TASK_TIMEOUT(7, "task timeout"),; TASK_TIMEOUT(7, "task timeout"),
CLOSE_ALERT(8, "the process instance success, can close the before alert")
;
AlertType(int code, String descp) { AlertType(int code, String descp) {
this.code = code; this.code = code;
......
...@@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper; ...@@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.AlertSendStatusMapper; import org.apache.dolphinscheduler.dao.mapper.AlertSendStatusMapper;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -89,7 +90,7 @@ public class AlertDao { ...@@ -89,7 +90,7 @@ public class AlertDao {
* update alert sending(execution) status * update alert sending(execution) status
* *
* @param alertStatus alertStatus * @param alertStatus alertStatus
* @param log log * @param log alert results json
* @param id id * @param id id
* @return update alert result * @return update alert result
*/ */
...@@ -257,6 +258,12 @@ public class AlertDao { ...@@ -257,6 +258,12 @@ public class AlertDao {
return alertMapper.selectList(wrapper); return alertMapper.selectList(wrapper);
} }
public List<Alert> listAlerts(int processInstanceId) {
LambdaQueryWrapper<Alert> wrapper = new QueryWrapper<>(new Alert()).lambda()
.eq(Alert::getProcessInstanceId, processInstanceId);
return alertMapper.selectList(wrapper);
}
/** /**
* for test * for test
* *
......
...@@ -692,16 +692,20 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { ...@@ -692,16 +692,20 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*/ */
public void endProcess() { public void endProcess() {
this.stateEvents.clear(); this.stateEvents.clear();
if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()) { if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType()
.typeIsSerialPriority()) {
checkSerialProcess(processDefinition); checkSerialProcess(processDefinition);
} }
if (processInstance.getState().typeIsWaitingThread()) { if (processInstance.getState().typeIsWaitingThread()) {
processService.createRecoveryWaitingThreadCommand(null, processInstance); processService.createRecoveryWaitingThreadCommand(null, processInstance);
} }
if (processAlertManager.isNeedToSendWarning(processInstance)) {
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
if (processAlertManager.isNeedToSendWarning(processInstance)) {
processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser); processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
} }
if (processInstance.getState().typeIsSuccess()) {
processAlertManager.closeAlert(processInstance);
}
if (checkTaskQueue()) { if (checkTaskQueue()) {
//release task group //release task group
processService.releaseAllTaskGroup(processInstance.getId()); processService.releaseAllTaskGroup(processInstance.getId());
......
...@@ -33,6 +33,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskAlertContent; ...@@ -33,6 +33,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskAlertContent;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
...@@ -270,6 +272,29 @@ public class ProcessAlertManager { ...@@ -270,6 +272,29 @@ public class ProcessAlertManager {
return sendWarning; return sendWarning;
} }
/**
* Send a close alert event, if the processInstance has sent alert before, then will insert a closed event.
*
* @param processInstance success process instance
*/
public void closeAlert(ProcessInstance processInstance) {
List<Alert> alerts = alertDao.listAlerts(processInstance.getId());
if (CollectionUtils.isEmpty(alerts)) {
// no need to close alert
return;
}
Alert alert = new Alert();
alert.setAlertGroupId(processInstance.getWarningGroupId());
alert.setUpdateTime(new Date());
alert.setCreateTime(new Date());
alert.setProjectCode(processInstance.getProcessDefinition().getProjectCode());
alert.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
alert.setProcessInstanceId(processInstance.getId());
alert.setAlertType(AlertType.CLOSE_ALERT);
alertDao.addAlert(alert);
}
/** /**
* send process timeout alert * send process timeout alert
* *
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册