未验证 提交 4ad34483 编写于 作者: W Wenjun Ruan 提交者: GitHub

Fix alert_send_status may throw duplicate key exception and add limit for query alert (#11953)

上级 5b6bebf7
......@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.alert;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertConstants;
import org.apache.dolphinscheduler.alert.api.AlertData;
......@@ -34,20 +32,28 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.AlertSendStatus;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.google.common.collect.Lists;
@Service
public final class AlertSenderService extends Thread {
......@@ -71,23 +77,29 @@ public final class AlertSenderService extends Thread {
@Override
public void run() {
logger.info("alert sender started");
logger.info("Alert sender thread started");
while (!ServerLifeCycleManager.isStopped()) {
try {
List<Alert> alerts = alertDao.listPendingAlerts();
if (CollectionUtils.isEmpty(alerts)) {
logger.debug("There is not waiting alerts");
continue;
}
AlertServerMetrics.registerPendingAlertGauge(alerts::size);
this.send(alerts);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L);
} catch (Exception e) {
logger.error("alert sender thread error", e);
logger.error("Alert sender thread meet an exception", e);
} finally {
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L);
}
}
logger.info("Alert sender thread stopped");
}
public void send(List<Alert> alerts) {
for (Alert alert : alerts) {
// get alert group from alert
int alertId = Optional.ofNullable(alert.getId()).orElse(0);
int alertId = alert.getId();
int alertGroupId = Optional.ofNullable(alert.getAlertGroupId()).orElse(0);
List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
if (CollectionUtils.isEmpty(alertInstanceList)) {
......@@ -107,16 +119,23 @@ public final class AlertSenderService extends Thread {
.build();
int sendSuccessCount = 0;
List<AlertSendStatus> alertSendStatuses = new ArrayList<>();
List<AlertResult> alertResults = new ArrayList<>();
for (AlertPluginInstance instance : alertInstanceList) {
AlertResult alertResult = this.alertResultHandler(instance, alertData);
if (alertResult != null) {
AlertStatus sendStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))
AlertStatus sendStatus = Boolean.parseBoolean(alertResult.getStatus())
? AlertStatus.EXECUTION_SUCCESS
: AlertStatus.EXECUTION_FAILURE;
alertDao.addAlertSendStatus(sendStatus, JSONUtils.toJsonString(alertResult), alertId,
instance.getId());
if (sendStatus.equals(AlertStatus.EXECUTION_SUCCESS)) {
AlertSendStatus alertSendStatus = AlertSendStatus.builder()
.alertId(alertId)
.alertPluginInstanceId(instance.getId())
.sendStatus(sendStatus)
.log(JSONUtils.toJsonString(alertResult))
.createTime(new Date())
.build();
alertSendStatuses.add(alertSendStatus);
if (AlertStatus.EXECUTION_SUCCESS.equals(sendStatus)) {
sendSuccessCount++;
AlertServerMetrics.incAlertSuccessCount();
} else {
......@@ -131,7 +150,11 @@ public final class AlertSenderService extends Thread {
} else if (sendSuccessCount < alertInstanceList.size()) {
alertStatus = AlertStatus.EXECUTION_PARTIAL_SUCCESS;
}
// we update the alert first to avoid duplicate key in alertSendStatus
// this may loss the alertSendStatus if the server restart
// todo: use transaction to update these two table
alertDao.updateAlert(alertStatus, JSONUtils.toJsonString(alertResults), alertId);
alertDao.insertAlertSendStatus(alertSendStatuses);
}
}
......
......@@ -154,6 +154,7 @@ public class AlertSenderServiceTest {
String content = "alert mail test content";
List<Alert> alertList = new ArrayList<>();
Alert alert = new Alert();
alert.setId(1);
alert.setAlertGroupId(alertGroupId);
alert.setTitle(title);
alert.setContent(content);
......
......@@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.AlertSendStatusMapper;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections4.CollectionUtils;
import java.time.LocalDateTime;
import java.time.ZoneId;
......@@ -59,6 +60,8 @@ import com.google.common.collect.Lists;
@Component
public class AlertDao {
private static final int QUERY_ALERT_THRESHOLD = 100;
@Value("${alert.alarm-suppression.crash:60}")
private Integer crashAlarmSuppression;
......@@ -136,12 +139,19 @@ public class AlertDao {
return alertSendStatusMapper.insert(alertSendStatus);
}
public int insertAlertSendStatus(List<AlertSendStatus> alertSendStatuses) {
if (CollectionUtils.isEmpty(alertSendStatuses)) {
return 0;
}
return alertSendStatusMapper.batchInsert(alertSendStatuses);
}
/**
* MasterServer or WorkerServer stopped
*
* @param alertGroupId alertGroupId
* @param host host
* @param serverType serverType
* @param host host
* @param serverType serverType
*/
public void sendServerStoppedAlert(int alertGroupId, String host, String serverType) {
ServerAlertContent serverStopAlertContent = ServerAlertContent.newBuilder().type(serverType)
......@@ -253,9 +263,7 @@ public class AlertDao {
* List alerts that are pending for execution
*/
public List<Alert> listPendingAlerts() {
LambdaQueryWrapper<Alert> wrapper = new QueryWrapper<>(new Alert()).lambda()
.eq(Alert::getAlertStatus, AlertStatus.WAIT_EXECUTION);
return alertMapper.selectList(wrapper);
return alertMapper.listingAlertByStatus(AlertStatus.WAIT_EXECUTION.getCode(), QUERY_ALERT_THRESHOLD);
}
public List<Alert> listAlerts(int processInstanceId) {
......
......@@ -24,15 +24,24 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("t_ds_alert")
public class Alert {
/**
* primary key
*/
......@@ -117,171 +126,4 @@ public class Alert {
@TableField(exist = false)
private Map<String, Object> info = new HashMap<>();
public Alert() {
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getSign() {
return sign;
}
public void setSign(String sign) {
this.sign = sign;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public AlertStatus getAlertStatus() {
return alertStatus;
}
public void setAlertStatus(AlertStatus alertStatus) {
this.alertStatus = alertStatus;
}
public String getLog() {
return log;
}
public void setLog(String log) {
this.log = log;
}
public Integer getAlertGroupId() {
return alertGroupId;
}
public void setAlertGroupId(Integer alertGroupId) {
this.alertGroupId = alertGroupId;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
public Map<String, Object> getInfo() {
return info;
}
public void setInfo(Map<String, Object> info) {
this.info = info;
}
public WarningType getWarningType() {
return warningType;
}
public void setWarningType(WarningType warningType) {
this.warningType = warningType;
}
public Long getProjectCode() {
return projectCode;
}
public void setProjectCode(Long projectCode) {
this.projectCode = projectCode;
}
public Long getProcessDefinitionCode() {
return processDefinitionCode;
}
public void setProcessDefinitionCode(Long processDefinitionCode) {
this.processDefinitionCode = processDefinitionCode;
}
public Integer getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(Integer processInstanceId) {
this.processInstanceId = processInstanceId;
}
public AlertType getAlertType() {
return alertType;
}
public void setAlertType(AlertType alertType) {
this.alertType = alertType;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Alert alert = (Alert) o;
return Objects.equals(id, alert.id)
&& Objects.equals(alertGroupId, alert.alertGroupId)
&& Objects.equals(sign, alert.sign)
&& Objects.equals(title, alert.title)
&& Objects.equals(content, alert.content)
&& alertStatus == alert.alertStatus
&& warningType == alert.warningType
&& Objects.equals(log, alert.log)
&& Objects.equals(createTime, alert.createTime)
&& Objects.equals(updateTime, alert.updateTime)
&& Objects.equals(info, alert.info)
;
}
@Override
public int hashCode() {
return Objects.hash(id, sign, title, content, alertStatus, warningType, log, alertGroupId, createTime, updateTime, info);
}
@Override
public String toString() {
return "Alert{"
+ "id=" + id
+ ", sign='" + sign + '\''
+ ", title='" + title + '\''
+ ", content='" + content + '\''
+ ", alertStatus=" + alertStatus
+ ", warningType=" + warningType
+ ", log='" + log + '\''
+ ", alertGroupId=" + alertGroupId
+ ", createTime=" + createTime
+ ", updateTime=" + updateTime
+ ", info=" + info
+ '}';
}
}
......@@ -21,15 +21,20 @@ import org.apache.dolphinscheduler.common.enums.AlertStatus;
import java.util.Date;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.google.common.base.Objects;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@TableName("t_ds_alert_send_status")
public class AlertSendStatus {
......@@ -69,20 +74,4 @@ public class AlertSendStatus {
@TableField("create_time")
private Date createTime;
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AlertSendStatus that = (AlertSendStatus) o;
return alertId == that.alertId && alertPluginInstanceId == that.alertPluginInstanceId;
}
@Override
public int hashCode() {
return Objects.hashCode(alertId, alertPluginInstanceId);
}
}
......@@ -23,6 +23,7 @@ import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Date;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
......@@ -32,10 +33,16 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
@Mapper
public interface AlertMapper extends BaseMapper<Alert> {
/**
* Query the alert by alertStatus and return limit with default sort.
*/
List<Alert> listingAlertByStatus(@Param("alertStatus") int alertStatus, @Param("limit") int limit);
/**
* Insert server crash alert
* <p>This method will ensure that there is at most one unsent alert which has the same content in the database.
*/
void insertAlertWhenServerCrash(@Param("alert") Alert alert, @Param("crashAlarmSuppressionStartTime") Date crashAlarmSuppressionStartTime);
void insertAlertWhenServerCrash(@Param("alert") Alert alert,
@Param("crashAlarmSuppressionStartTime") Date crashAlarmSuppressionStartTime);
}
......@@ -19,7 +19,11 @@ package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.AlertSendStatus;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface AlertSendStatusMapper extends BaseMapper<AlertSendStatus> {
int batchInsert(List<AlertSendStatus> alertSendStatuses);
}
......@@ -18,13 +18,38 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.AlertMapper">
<sql id="baseSql">
id
, sign, title, content, alert_status, warning_type, log, alertgroup_id, create_time, update_time, project_code,
process_definition_code, process_instance_id, alert_type
</sql>
<insert id="insertAlertWhenServerCrash">
insert into t_ds_alert(sign, title, content, alert_status, warning_type, log, alertgroup_id, create_time, update_time, alert_type)
SELECT #{alert.sign}, #{alert.title}, #{alert.content}, #{alert.alertStatus.code}, #{alert.warningType.code},
#{alert.log}, #{alert.alertGroupId}, #{alert.createTime}, #{alert.updateTime}, #{alert.alertType.code}
insert into t_ds_alert(sign, title, content, alert_status, warning_type, log, alertgroup_id, create_time,
update_time, alert_type)
SELECT #{alert.sign},
#{alert.title},
#{alert.content},
#{alert.alertStatus.code},
#{alert.warningType.code},
#{alert.log},
#{alert.alertGroupId},
#{alert.createTime},
#{alert.updateTime},
#{alert.alertType.code}
from t_ds_alert
where create_time >= #{crashAlarmSuppressionStartTime} and sign = #{alert.sign} and alert_status = #{alert.alertStatus.code}
where create_time >= #{crashAlarmSuppressionStartTime}
and sign = #{alert.sign}
and alert_status = #{alert.alertStatus.code}
having count(*) = 0
</insert>
<select id="listingAlertByStatus" resultType="org.apache.dolphinscheduler.dao.entity.Alert">
select
<include refid="baseSql"/>
from t_ds_alert
where alert_status = #{alertStatus}
limit #{limit}
</select>
</mapper>
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.AlertSendStatusMapper">
<insert id="batchInsert">
insert into t_ds_alert_send_status (alert_id, alert_plugin_instance_id, send_status, log, create_time)
values
<foreach collection="alertSendStatuses" item="alertSendStatus" separator="," open="(" close=")">
#{alertSendStatus.alertId},
#{alertSendStatus.alertPluginInstanceId},
#{alertSendStatus.sendStatus},
#{alertSendStatus.log},
#{alertSendStatus.createTime},
</foreach>
</insert>
</mapper>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册