From 8e21c38c0041aa2689eb9d9bc703f67292c98e88 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Mon, 1 Aug 2022 13:04:27 +0800 Subject: [PATCH] Write alert result into db (#11221) --- .../alert/api/AlertChannel.java | 5 ++ .../dolphinscheduler/alert/api/AlertData.java | 5 ++ .../alert/api/AlertResult.java | 5 +- .../alert/AlertSenderService.java | 72 ++++++++++++------- .../common/enums/AlertType.java | 7 +- .../apache/dolphinscheduler/dao/AlertDao.java | 11 ++- .../runner/WorkflowExecuteRunnable.java | 8 ++- .../service/alert/ProcessAlertManager.java | 27 ++++++- 8 files changed, 105 insertions(+), 35 deletions(-) diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannel.java b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannel.java index 14dca78f6..56c45362d 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannel.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannel.java @@ -25,8 +25,13 @@ package org.apache.dolphinscheduler.alert.api; public interface AlertChannel { /** * process and send alert + * * @param info alert info * @return process alarm result */ AlertResult process(AlertInfo info); + + default AlertResult closeAlert(AlertInfo info) { + return new AlertResult("true", "no need to close alert"); + } } diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertData.java b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertData.java index 50b6c2cc7..37a3f3357 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertData.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertData.java @@ -61,4 +61,9 @@ public class AlertData { */ private int warnType; + /** + * AlertType#code + */ + private int alertType; + } diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertResult.java b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertResult.java index 734d51779..b6c5db38e 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertResult.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertResult.java @@ -20,24 +20,27 @@ package org.apache.dolphinscheduler.alert.api; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; /** * alert result */ +@Builder @AllArgsConstructor @Data @NoArgsConstructor public class AlertResult { /** + * todo: use enum * false or true */ private String status; /** - * alert result message + * alert result message, each plugin can have its own message */ private String message; diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java index 2f9157932..fe2e1aaf4 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.alert.api.AlertInfo; import org.apache.dolphinscheduler.alert.api.AlertResult; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AlertStatus; +import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; @@ -43,10 +44,14 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; +import com.google.common.collect.Lists; + @Service public final class AlertSenderService extends Thread { private static final Logger logger = LoggerFactory.getLogger(AlertSenderService.class); @@ -89,26 +94,31 @@ public final class AlertSenderService extends Thread { List alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId); if (CollectionUtils.isEmpty(alertInstanceList)) { logger.error("send alert msg fail,no bind plugin instance."); - alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "no bind plugin instance", alertId); + List alertResults = Lists.newArrayList(new AlertResult("false", + "no bind plugin instance")); + alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, JSONUtils.toJsonString(alertResults), alertId); continue; } AlertData alertData = AlertData.builder() - .id(alertId) - .content(alert.getContent()) - .log(alert.getLog()) - .title(alert.getTitle()) - .warnType(alert.getWarningType().getCode()) - .build(); + .id(alertId) + .content(alert.getContent()) + .log(alert.getLog()) + .title(alert.getTitle()) + .warnType(alert.getWarningType().getCode()) + .alertType(alert.getAlertType().getCode()) + .build(); int sendSuccessCount = 0; + List alertResults = new ArrayList<>(); for (AlertPluginInstance instance : alertInstanceList) { AlertResult alertResult = this.alertResultHandler(instance, alertData); if (alertResult != null) { AlertStatus sendStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus())) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE; - alertDao.addAlertSendStatus(sendStatus,alertResult.getMessage(),alertId,instance.getId()); + alertDao.addAlertSendStatus(sendStatus, JSONUtils.toJsonString(alertResult), alertId, instance.getId()); if (sendStatus.equals(AlertStatus.EXECUTION_SUCCESS)) { sendSuccessCount++; } + alertResults.add(alertResult); } } AlertStatus alertStatus = AlertStatus.EXECUTION_SUCCESS; @@ -117,7 +127,7 @@ public final class AlertSenderService extends Thread { } else if (sendSuccessCount < alertInstanceList.size()) { alertStatus = AlertStatus.EXECUTION_PARTIAL_SUCCESS; } - alertDao.updateAlert(alertStatus, "", alertId); + alertDao.updateAlert(alertStatus, JSONUtils.toJsonString(alertResults), alertId); } } @@ -170,17 +180,18 @@ public final class AlertSenderService extends Thread { * @param alertData alertData * @return AlertResult */ - private AlertResult alertResultHandler(AlertPluginInstance instance, AlertData alertData) { - Optional alertChannel = alertPluginManager.getAlertChannel(instance.getPluginDefineId()); - AlertResult alertResultExtend = new AlertResult(); + private @Nullable AlertResult alertResultHandler(AlertPluginInstance instance, AlertData alertData) { String pluginInstanceName = instance.getInstanceName(); - if (!alertChannel.isPresent()) { - String message = String.format("Alert Plugin %s send error : return value is null", pluginInstanceName); - alertResultExtend.setStatus(String.valueOf(false)); - alertResultExtend.setMessage(message); - logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, instance.getPluginDefineId()); - return alertResultExtend; + int pluginDefineId = instance.getPluginDefineId(); + Optional alertChannelOptional = alertPluginManager.getAlertChannel(instance.getPluginDefineId()); + if (!alertChannelOptional.isPresent()) { + String message = String.format("Alert Plugin %s send error: the channel doesn't exist, pluginDefineId: %s", + pluginInstanceName, + pluginDefineId); + logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginDefineId); + return new AlertResult("false", message); } + AlertChannel alertChannel = alertChannelOptional.get(); Map paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams()); String instanceWarnType = WarningType.ALL.getDescp(); @@ -193,10 +204,8 @@ public final class AlertSenderService extends Thread { if (warningType == null) { String message = String.format("Alert Plugin %s send error : plugin warnType is null", pluginInstanceName); - alertResultExtend.setStatus(String.valueOf(false)); - alertResultExtend.setMessage(message); logger.error("Alert Plugin {} send error : plugin warnType is null", pluginInstanceName); - return alertResultExtend; + return new AlertResult("false", message); } boolean sendWarning = false; @@ -231,10 +240,18 @@ public final class AlertSenderService extends Thread { AlertResult alertResult; try { if (waitTimeout <= 0) { - alertResult = alertChannel.get().process(alertInfo); + if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) { + alertResult = alertChannel.closeAlert(alertInfo); + } else { + alertResult = alertChannel.process(alertInfo); + } } else { - CompletableFuture future = - CompletableFuture.supplyAsync(() -> alertChannel.get().process(alertInfo)); + CompletableFuture future; + if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) { + future = CompletableFuture.supplyAsync(() -> alertChannel.closeAlert(alertInfo)); + } else { + future = CompletableFuture.supplyAsync(() -> alertChannel.process(alertInfo)); + } alertResult = future.get(waitTimeout, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { @@ -246,18 +263,19 @@ public final class AlertSenderService extends Thread { logger.error("send alert error alert data id :{},", alertData.getId(), e); } + AlertResult alertResultExtend = new AlertResult(); if (alertResult == null) { String message = String.format("Alert Plugin %s send error : return alertResult value is null", pluginInstanceName); - alertResultExtend.setStatus(String.valueOf(false)); + alertResultExtend.setStatus("false"); alertResultExtend.setMessage(message); logger.info("Alert Plugin {} send error : return alertResult value is null", pluginInstanceName); } else if (!Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))) { - alertResultExtend.setStatus(String.valueOf(false)); + alertResultExtend.setStatus("false"); alertResultExtend.setMessage(alertResult.getMessage()); logger.info("Alert Plugin {} send error : {}", pluginInstanceName, alertResult.getMessage()); } else { String message = String.format("Alert Plugin %s send success", pluginInstanceName); - alertResultExtend.setStatus(String.valueOf(true)); + alertResultExtend.setStatus("true"); alertResultExtend.setMessage(message); logger.info("Alert Plugin {} send success", pluginInstanceName); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java index 8b97dbdf9..d5713a775 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java @@ -25,7 +25,7 @@ import com.baomidou.mybatisplus.annotation.EnumValue; public enum AlertType { /** * 0 process instance failure, 1 process instance success, 2 process instance blocked, 3 process instance timeout, 4 fault tolerance warning, - * 5 task failure, 6 task success, 7 task timeout + * 5 task failure, 6 task success, 7 task timeout, 8 close alert */ PROCESS_INSTANCE_FAILURE(0, "process instance failure"), PROCESS_INSTANCE_SUCCESS(1, "process instance success"), @@ -34,7 +34,10 @@ public enum AlertType { FAULT_TOLERANCE_WARNING(4, "fault tolerance warning"), TASK_FAILURE(5, "task failure"), TASK_SUCCESS(6, "task success"), - TASK_TIMEOUT(7, "task timeout"),; + TASK_TIMEOUT(7, "task timeout"), + + CLOSE_ALERT(8, "the process instance success, can close the before alert") + ; AlertType(int code, String descp) { this.code = code; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java index 76fb17e14..018cea482 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java @@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.AlertSendStatusMapper; import org.apache.commons.codec.digest.DigestUtils; + import java.time.LocalDateTime; import java.time.ZoneId; import java.util.ArrayList; @@ -89,7 +90,7 @@ public class AlertDao { * update alert sending(execution) status * * @param alertStatus alertStatus - * @param log log + * @param log alert results json * @param id id * @return update alert result */ @@ -253,7 +254,13 @@ public class AlertDao { */ public List listPendingAlerts() { LambdaQueryWrapper wrapper = new QueryWrapper<>(new Alert()).lambda() - .eq(Alert::getAlertStatus, AlertStatus.WAIT_EXECUTION); + .eq(Alert::getAlertStatus, AlertStatus.WAIT_EXECUTION); + return alertMapper.selectList(wrapper); + } + + public List listAlerts(int processInstanceId) { + LambdaQueryWrapper wrapper = new QueryWrapper<>(new Alert()).lambda() + .eq(Alert::getProcessInstanceId, processInstanceId); return alertMapper.selectList(wrapper); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 27f90d3d0..f2965f79f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -692,16 +692,20 @@ public class WorkflowExecuteRunnable implements Callable { */ public void endProcess() { this.stateEvents.clear(); - if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()) { + if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType() + .typeIsSerialPriority()) { checkSerialProcess(processDefinition); } if (processInstance.getState().typeIsWaitingThread()) { processService.createRecoveryWaitingThreadCommand(null, processInstance); } + ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); if (processAlertManager.isNeedToSendWarning(processInstance)) { - ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser); } + if (processInstance.getState().typeIsSuccess()) { + processAlertManager.closeAlert(processInstance); + } if (checkTaskQueue()) { //release task group processService.releaseAllTaskGroup(processInstance.getId()); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java index 4fca7b47c..9296aebe2 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java @@ -33,6 +33,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskAlertContent; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState; +import org.apache.commons.collections4.CollectionUtils; + import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -270,11 +272,34 @@ public class ProcessAlertManager { return sendWarning; } + /** + * Send a close alert event, if the processInstance has sent alert before, then will insert a closed event. + * + * @param processInstance success process instance + */ + public void closeAlert(ProcessInstance processInstance) { + List alerts = alertDao.listAlerts(processInstance.getId()); + if (CollectionUtils.isEmpty(alerts)) { + // no need to close alert + return; + } + + Alert alert = new Alert(); + alert.setAlertGroupId(processInstance.getWarningGroupId()); + alert.setUpdateTime(new Date()); + alert.setCreateTime(new Date()); + alert.setProjectCode(processInstance.getProcessDefinition().getProjectCode()); + alert.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); + alert.setProcessInstanceId(processInstance.getId()); + alert.setAlertType(AlertType.CLOSE_ALERT); + alertDao.addAlert(alert); + } + /** * send process timeout alert * * @param processInstance process instance - * @param projectUser projectUser + * @param projectUser projectUser */ public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProjectUser projectUser) { alertDao.sendProcessTimeoutAlert(processInstance, projectUser); -- GitLab