diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java index 5b66932a36b273e5b839cbfa8051c8f1af7e8da1..6787b8c386ea00b64061603e06b01373e1f0ef9e 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java @@ -44,6 +44,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import com.google.common.collect.Lists; + @Component public class AlertDao extends AbstractBaseDao { @@ -99,15 +101,23 @@ public class AlertDao extends AbstractBaseDao { * @param serverType serverType */ public void sendServerStopedAlert(int alertGroupId, String host, String serverType) { - Alert alert = new Alert(); - List serverAlertContents = new ArrayList<>(1); ServerAlertContent serverStopAlertContent = ServerAlertContent.newBuilder(). - type(serverType).host(host).event(AlertEvent.SERVER_DOWN).warningLevel(AlertWarnLevel.SERIOUS). + type(serverType) + .host(host) + .event(AlertEvent.SERVER_DOWN) + .warningLevel(AlertWarnLevel.SERIOUS). build(); - serverAlertContents.add(serverStopAlertContent); - String content = JSONUtils.toJsonString(serverAlertContents); + String content = JSONUtils.toJsonString(Lists.newArrayList(serverStopAlertContent)); + + Alert alert = new Alert(); alert.setTitle("Fault tolerance warning"); - saveTaskTimeoutAlert(alert, content, alertGroupId); + alert.setAlertStatus(AlertStatus.WAIT_EXECUTION); + alert.setContent(content); + alert.setAlertGroupId(alertGroupId); + alert.setCreateTime(new Date()); + alert.setUpdateTime(new Date()); + // we use this method to avoid insert duplicate alert(issue #5525) + alertMapper.insertAlertWhenServerCrash(alert); } /** diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java index d97e16d19dc21b8bcae5a9e4bada31f2f5ecf7cd..77786c5a1e70b3b270f7ea15047fcc86c954df29 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java @@ -14,15 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.common.enums.AlertStatus; import org.apache.dolphinscheduler.dao.entity.Alert; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; + import org.apache.ibatis.annotations.Param; import java.util.List; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + /** * alert mapper interface */ @@ -35,4 +38,10 @@ public interface AlertMapper extends BaseMapper { */ List listAlertByStatus(@Param("alertStatus") AlertStatus alertStatus); + /** + * Insert server crash alert + *

This method will ensure that there is at most one unsent alert which has the same content in the database. + */ + void insertAlertWhenServerCrash(@Param("alert") Alert alert); + } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml index 9be5c7c784a83dab7f4c6e34e60a5f95fdaa6a5a..40f538339dcfae9ee4cfe915d611b313bf10f22c 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml @@ -29,4 +29,13 @@ from t_ds_alert where alert_status = #{alertStatus} + + + insert into t_ds_alert(title, content, alert_status, log, alertgroup_id, create_time, update_time) + SELECT #{alert.title}, #{alert.content}, #{alert.alertStatus.code}, #{alert.log}, #{alert.alertGroupId}, + #{alert.createTime}, #{alert.updateTime} + from t_ds_alert + where content = #{alert.content} and alert_status = #{alert.alertStatus.code} + having count(*) = 0 + diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/AlertDaoTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/AlertDaoTest.java index 0137bd5639c53bdaacf8be3ea3299dbfb4457deb..7b9e8c6f35cb6aa8f44b3c9dfa9ae0b17243b4d9 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/AlertDaoTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/AlertDaoTest.java @@ -24,7 +24,9 @@ import java.util.List; import org.junit.Assert; import org.junit.Test; +import org.springframework.transaction.annotation.Transactional; +@Transactional public class AlertDaoTest { @Test @@ -42,4 +44,19 @@ public class AlertDaoTest { Assert.assertNotNull(alerts); Assert.assertNotEquals(0, alerts.size()); } + + @Test + public void testSendServerStopedAlert() { + AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class); + int alertGroupId = 1; + String host = "127.0.0.998165432"; + String serverType = "Master"; + alertDao.sendServerStopedAlert(alertGroupId, host, serverType); + alertDao.sendServerStopedAlert(alertGroupId, host, serverType); + long count = alertDao.listWaitExecutionAlert() + .stream() + .filter(alert -> alert.getContent().contains(host)) + .count(); + Assert.assertEquals(1L, count); + } }