diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessDefinitionService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessDefinitionService.java index 119c840e564957e45b77f330fa3605ad4795ed38..97ac965ff17115cf463cf844c2f18448926b6687 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessDefinitionService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessDefinitionService.java @@ -125,6 +125,7 @@ public class ProcessDefinitionService extends BaseDAGService { processDefine.setDesc(desc); processDefine.setLocations(locations); processDefine.setConnects(connects); + processDefine.setTimeout(processData.getTimeout()); //custom global params List globalParamsList = processData.getGlobalParams(); @@ -288,6 +289,7 @@ public class ProcessDefinitionService extends BaseDAGService { processDefine.setDesc(desc); processDefine.setLocations(locations); processDefine.setConnects(connects); + processDefine.setTimeout(processData.getTimeout()); //custom global params List globalParamsList = processData.getGlobalParams(); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java index 54719f90f4b182e8c7250fa653eb8c44a2d265a1..43d5377da2974b253e3ea1418f748d95f9a33374 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java @@ -346,7 +346,8 @@ public class ProcessInstanceService extends BaseDAGService { //check process instance status if (!processInstance.getState().typeIsFinished()) { - putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, "update"); + putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, + processInstance.getName(), processInstance.getState().toString(), "update"); return result; } Date schedule = null; @@ -355,8 +356,12 @@ public class ProcessInstanceService extends BaseDAGService { } else { schedule = processInstance.getScheduleTime(); } + processInstance.setScheduleTime(schedule); + processInstance.setLocations(locations); + processInstance.setConnects(connects); String globalParams = null; String originDefParams = null; + int timeout = processInstance.getTimeout(); if (StringUtils.isNotEmpty(processInstanceJson)) { ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class); //check workflow json is valid @@ -370,9 +375,14 @@ public class ProcessInstanceService extends BaseDAGService { Map globalParamMap = globalParamList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, processInstance.getCmdTypeIfComplement(), schedule); - } - int update = processDao.updateProcessInstance(processInstanceId, processInstanceJson, - globalParams, schedule, flag, locations, connects); + timeout = processData.getTimeout(); + processInstance.setTimeout(timeout); + processInstance.setProcessInstanceJson(processInstanceJson); + processInstance.setGlobalParams(globalParams); + } +// int update = processDao.updateProcessInstance(processInstanceId, processInstanceJson, +// globalParams, schedule, flag, locations, connects); + int update = processDao.updateProcessInstance(processInstance); int updateDefine = 1; if (syncDefine && StringUtils.isNotEmpty(processInstanceJson)) { ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId()); @@ -380,6 +390,7 @@ public class ProcessInstanceService extends BaseDAGService { processDefinition.setGlobalParams(originDefParams); processDefinition.setLocations(locations); processDefinition.setConnects(connects); + processDefinition.setTimeout(timeout); updateDefine = processDefineMapper.update(processDefinition); } if (update > 0 && updateDefine > 0) { diff --git a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java index e3c56b8a9c380d2a2255c21867ba1171365df7e7..abf29fdc99c1f5817bdd2f1939675130fab011e7 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java @@ -331,6 +331,11 @@ public final class Constants { */ public static final int MAX_TASK_TIMEOUT = 24 * 3600; + /** + * max task timeout + */ + public static final int MAX_PROCESS_TIMEOUT = Integer.MAX_VALUE; + /** * heartbeat threads number diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/AlertDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/AlertDao.java index dcc87eee80caef6f4f0d671c9c7724721893b5de..bf7ac14c27e4f8760d41ac9783bc0b55bbf7d92a 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/AlertDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/AlertDao.java @@ -23,6 +23,8 @@ import cn.escheduler.dao.datasource.ConnectionFactory; import cn.escheduler.dao.mapper.AlertMapper; import cn.escheduler.dao.mapper.UserAlertGroupMapper; import cn.escheduler.dao.model.Alert; +import cn.escheduler.dao.model.ProcessDefinition; +import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.User; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -83,8 +85,9 @@ public class AlertDao extends AbstractBaseDao { */ public void sendServerStopedAlert(int alertgroupId,String host,String serverType){ Alert alert = new Alert(); - String content = String.format("[{'type':'%s','host':'%s','event':'服务挂掉','警告级别':'严重'}]",serverType,host); - alert.setTitle("容错告警"); + String content = String.format("[{'type':'%s','host':'%s','event':'server down','warning level':'serious'}]", + serverType, host); + alert.setTitle("Fault tolerance warning"); alert.setShowType(ShowType.TABLE); alert.setContent(content); alert.setAlertType(AlertType.EMAIL); @@ -94,6 +97,34 @@ public class AlertDao extends AbstractBaseDao { alertMapper.insert(alert); } + /** + * process time out alert + * @param processInstance + * @param processDefinition + */ + public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition){ + int alertgroupId = processInstance.getWarningGroupId(); + String receivers = processDefinition.getReceivers(); + String receiversCc = processDefinition.getReceiversCc(); + Alert alert = new Alert(); + String content = String.format("[{'id':'%d','name':'%s','event':'timeout','warnLevel':'middle'}]", + processInstance.getId(), processInstance.getName()); + alert.setTitle("Process Timeout Warn"); + alert.setShowType(ShowType.TABLE); + alert.setContent(content); + alert.setAlertType(AlertType.EMAIL); + alert.setAlertGroupId(alertgroupId); + if (StringUtils.isNotEmpty(receivers)) { + alert.setReceivers(receivers); + } + if (StringUtils.isNotEmpty(receiversCc)) { + alert.setReceiversCc(receiversCc); + } + alert.setCreateTime(new Date()); + alert.setUpdateTime(new Date()); + alertMapper.insert(alert); + } + /** * task timeout warn */ diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index bb48d8e523a5d29819bcf0316bcc62cf35b4f39d..b18fb5e974963228e18d17b0bd678e9efadfa930 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -482,6 +482,7 @@ public class ProcessDao extends AbstractBaseDao { // set process instance priority processInstance.setProcessInstancePriority(command.getProcessInstancePriority()); processInstance.setWorkerGroupId(command.getWorkerGroupId()); + processInstance.setTimeout(processDefinition.getTimeout()); return processInstance; } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapper.java index 0484b7cebe9a49255b95b6b9665d365bf539d815..b750b9dbcf55092a2a0056b8cd0466ed5783d77c 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapper.java @@ -94,6 +94,7 @@ public interface ProcessDefinitionMapper { @Result(property = "locations", column = "locations", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "receivers", column = "receivers", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "receiversCc", column = "receivers_cc", javaType = String.class, jdbcType = JdbcType.VARCHAR) @@ -121,6 +122,7 @@ public interface ProcessDefinitionMapper { @Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "locations", column = "locations", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR) }) @SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryByDefineName") @@ -157,6 +159,7 @@ public interface ProcessDefinitionMapper { @Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), @Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT), @Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR) }) @SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryAllDefinitionList") @@ -183,6 +186,7 @@ public interface ProcessDefinitionMapper { @Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT), @Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "scheduleReleaseState", column = "schedule_release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT), + @Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR) }) @SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryDefineListPaging") @@ -211,6 +215,7 @@ public interface ProcessDefinitionMapper { @Result(property = "locations", column = "locations", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR) }) @SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryDefinitionListByIdList") diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapperProvider.java index cefbe14bbdbfa38d8c4edd3015a6ee792243601b..192e46aec23f0804714850b6b7cb0070043b5342 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapperProvider.java @@ -55,6 +55,7 @@ public class ProcessDefinitionMapperProvider { VALUES("`connects`", "#{processDefinition.connects}"); VALUES("`create_time`", "#{processDefinition.createTime}"); VALUES("`update_time`", "#{processDefinition.updateTime}"); + VALUES("`timeout`", "#{processDefinition.timeout}"); VALUES("`flag`", EnumFieldUtil.genFieldStr("processDefinition.flag", ReleaseState.class)); VALUES("`user_id`", "#{processDefinition.userId}"); @@ -100,6 +101,7 @@ public class ProcessDefinitionMapperProvider { SET("`global_params`=#{processDefinition.globalParams}"); SET("`create_time`=#{processDefinition.createTime}"); SET("`update_time`=#{processDefinition.updateTime}"); + SET("`timeout`=#{processDefinition.timeout}"); SET("`flag`="+EnumFieldUtil.genFieldStr("processDefinition.flag", Flag.class)); SET("`user_id`=#{processDefinition.userId}"); @@ -173,7 +175,7 @@ public class ProcessDefinitionMapperProvider { */ public String queryDefineListPaging(Map parameter) { return new SQL() {{ - SELECT("td.id,td.name,td.version,td.release_state,td.project_id,td.user_id,td.`desc`,td.create_time,td.update_time,td.flag,td.global_params,td.receivers,td.receivers_cc,sc.schedule_release_state"); + SELECT("td.*,sc.schedule_release_state"); FROM(TABLE_NAME + " td"); LEFT_OUTER_JOIN(" (select process_definition_id,release_state as schedule_release_state from `t_escheduler_schedules` " + "group by `process_definition_id`,`release_state`) sc on sc.process_definition_id = td.id"); diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapper.java index b3f9f10b8bbf7bf78db633f89ff323863ec0e5a4..03fbd6db2712c59e19d6e7070cf24bae29ce0aea 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapper.java @@ -95,6 +95,7 @@ public interface ProcessInstanceMapper { @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "tenantCode", column = "tenant_code", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryDetailById") @@ -133,6 +134,7 @@ public interface ProcessInstanceMapper { @Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryById") @@ -171,6 +173,7 @@ public interface ProcessInstanceMapper { @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @@ -209,6 +212,7 @@ public interface ProcessInstanceMapper { @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @@ -256,6 +260,7 @@ public interface ProcessInstanceMapper { @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @@ -352,6 +357,7 @@ public interface ProcessInstanceMapper { @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @@ -444,6 +450,7 @@ public interface ProcessInstanceMapper { @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @@ -488,6 +495,7 @@ public interface ProcessInstanceMapper { @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @@ -532,6 +540,7 @@ public interface ProcessInstanceMapper { @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @@ -574,6 +583,7 @@ public interface ProcessInstanceMapper { @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryLastRunningProcess") @@ -616,6 +626,7 @@ public interface ProcessInstanceMapper { @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryLastManualProcess") diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java index 92df768732bb739931197cf66e7be6eaf29cc177..cd9daa3781fb2f611d14507e304a2789821f336e 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java @@ -68,6 +68,7 @@ public class ProcessInstanceMapperProvider { VALUES("`is_sub_process`", EnumFieldUtil.genFieldStr("processInstance.isSubProcess", Flag.class)); VALUES("`executor_id`", "#{processInstance.executorId}"); VALUES("`worker_group_id`", "#{processInstance.workerGroupId}"); + VALUES("`timeout`", "#{processInstance.timeout}"); VALUES("`process_instance_priority`", EnumFieldUtil.genFieldStr("processInstance.processInstancePriority", Priority.class)); } }.toString(); @@ -141,6 +142,7 @@ public class ProcessInstanceMapperProvider { SET("`is_sub_process`="+EnumFieldUtil.genFieldStr("processInstance.isSubProcess", Flag.class)); SET("`executor_id`=#{processInstance.executorId}"); SET("`worker_group_id`=#{processInstance.workerGroupId}"); + SET("`timeout`=#{processInstance.timeout}"); WHERE("`id`=#{processInstance.id}"); diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessData.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessData.java index ba967f8be4d81e87e5a8b3cc1f33b11c8c53fc3b..0623144142136833679aa877cbcc2b58b6195606 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessData.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessData.java @@ -37,6 +37,9 @@ public class ProcessData { private List globalParams; + private int timeout; + + public ProcessData() { } @@ -82,4 +85,11 @@ public class ProcessData { this.globalParams = globalParams; } + public int getTimeout() { + return timeout; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessDefinition.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessDefinition.java index 51498f5ddf38c42a0ff2f1d06c72d13d40c204a8..fb0c1d0f9849ffcd23e703c3e774f5ebecb57e34 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessDefinition.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessDefinition.java @@ -136,6 +136,11 @@ public class ProcessDefinition { */ private ReleaseState scheduleReleaseState; + /** + * process warning time out. unit: minute + */ + private int timeout; + public String getName() { return name; @@ -316,6 +321,14 @@ public class ProcessDefinition { this.scheduleReleaseState = scheduleReleaseState; } + public int getTimeout() { + return timeout; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + @Override public String toString() { return "ProcessDefinition{" + @@ -340,6 +353,8 @@ public class ProcessDefinition { ", receivers='" + receivers + '\'' + ", receiversCc='" + receiversCc + '\'' + ", scheduleReleaseState=" + scheduleReleaseState + + ", timeout=" + timeout + '}'; } + } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java index b37450c330f33167de1085d3ae918a1ee4e461ea..f156752b9889693856b4581c87a92cddb161dec8 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java @@ -183,6 +183,11 @@ public class ProcessInstance { */ private int workerGroupId; + /** + * process timeout for warning + */ + private int timeout; + public ProcessInstance(){ } @@ -495,6 +500,14 @@ public class ProcessInstance { this.workerGroupId = workerGroupId; } + public int getTimeout() { + return timeout; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + @Override public String toString() { return "ProcessInstance{" + @@ -528,7 +541,9 @@ public class ProcessInstance { ", historyCmd='" + historyCmd + '\'' + ", dependenceScheduleTimes='" + dependenceScheduleTimes + '\'' + ", duration=" + duration + + ", timeout=" + timeout + ", processInstancePriority=" + processInstancePriority + '}'; } + } diff --git a/escheduler-dao/src/main/resources/dao/data_source.properties b/escheduler-dao/src/main/resources/dao/data_source.properties index 3c89dd1fd2f9488886f01a1a0808f312d8b63400..cac3aa5e201c56bbffe8ead05140a34f44d23544 100644 --- a/escheduler-dao/src/main/resources/dao/data_source.properties +++ b/escheduler-dao/src/main/resources/dao/data_source.properties @@ -1,9 +1,9 @@ # base spring data source configuration spring.datasource.type=com.alibaba.druid.pool.DruidDataSource spring.datasource.driver-class-name=com.mysql.jdbc.Driver -spring.datasource.url=jdbc:mysql://192.168.220.188:3306/escheduler_new?characterEncoding=UTF-8 -spring.datasource.username=root -spring.datasource.password=root@123 +spring.datasource.url=jdbc:mysql://192.168.xx.xx:3306/escheduler?characterEncoding=UTF-8 +spring.datasource.username=xx +spring.datasource.password=xx # connection configuration spring.datasource.initialSize=5 diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java index b5ed0053a21334447a4061910256fc2c5510a7f7..ae170b8925855b1a14c551ff54094cee869d2bd4 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java @@ -258,7 +258,7 @@ public class MasterExecThread implements Runnable { processDao.createRecoveryWaitingThreadCommand(null, processInstance); } List taskInstances = processDao.findValidTaskListByProcessId(processInstance.getId()); - alertManager.sendWarnningOfProcessInstance(processInstance, taskInstances); + alertManager.sendAlertProcessInstance(processInstance, taskInstances); } @@ -775,8 +775,15 @@ public class MasterExecThread implements Runnable { private void runProcess(){ // submit start node submitPostNode(null); - // submitStandByTask(); + boolean sendTimeWarning = false; while(!processInstance.IsProcessInstanceStop()){ + + // send warning email if process time out. + if( !sendTimeWarning && checkProcessTimeOut(processInstance) ){ + alertManager.sendProcessTimeoutAlert(processInstance, + processDao.findProcessDefineById(processInstance.getProcessDefinitionId())); + sendTimeWarning = true; + } Set keys = activeTaskNode.keySet(); for (MasterBaseTaskExecThread taskExecThread : keys) { Future future = activeTaskNode.get(taskExecThread); @@ -821,7 +828,7 @@ public class MasterExecThread implements Runnable { } // send alert if(this.recoverToleranceFaultTaskList.size() > 0){ - alertManager.sendWarnningWorkerleranceFault(processInstance, recoverToleranceFaultTaskList); + alertManager.sendAlertWorkerToleranceFault(processInstance, recoverToleranceFaultTaskList); this.recoverToleranceFaultTaskList.clear(); } // updateProcessInstance completed task status @@ -851,6 +858,25 @@ public class MasterExecThread implements Runnable { logger.info("process:{} end, state :{}", processInstance.getId(), processInstance.getState()); } + /** + * check process time out + * @param processInstance + * @return + */ + private boolean checkProcessTimeOut(ProcessInstance processInstance) { + if(processInstance.getTimeout() == 0 ){ + return false; + } + + Date now = new Date(); + long runningTime = DateUtils.differMs(now, processInstance.getStartTime()); + + if(runningTime > processInstance.getTimeout()){ + return true; + } + return false; + } + private boolean canSubmitTaskToQueue() { return OSUtils.checkResource(conf, true); } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java b/escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java index ca478096bd5be6728cf09dffa58b764cc5cf93d0..fc62bcf73d4181f7581c984b3d0a5bd28f52c458 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java @@ -26,6 +26,7 @@ import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.dao.AlertDao; import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.model.Alert; +import cn.escheduler.dao.model.ProcessDefinition; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; import org.slf4j.Logger; @@ -54,27 +55,27 @@ public class AlertManager { private String getCommandCnName(CommandType commandType) { switch (commandType) { case RECOVER_TOLERANCE_FAULT_PROCESS: - return "恢复容错"; + return "recover tolerance fault process"; case RECOVER_SUSPENDED_PROCESS: - return "恢复暂停流程"; + return "recover suspended process"; case START_CURRENT_TASK_PROCESS: - return "从当前节点开始执行"; + return "start current task process"; case START_FAILURE_TASK_PROCESS: - return "从失败节点开始执行"; + return "start failure task process"; case START_PROCESS: - return "启动工作流"; + return "start process"; case REPEAT_RUNNING: - return "重跑"; + return "repeat running"; case SCHEDULER: - return "定时执行"; + return "scheduler"; case COMPLEMENT_DATA: - return "补数"; + return "complement data"; case PAUSE: - return "暂停工作流"; + return "pause"; case STOP: - return "停止工作流"; + return "stop"; default: - return "未知的命令类型"; + return "unknown type"; } } @@ -124,14 +125,14 @@ public class AlertManager { continue; } LinkedHashMap failedTaskMap = new LinkedHashMap(); - failedTaskMap.put("任务id", String.valueOf(task.getId())); - failedTaskMap.put("任务名称", task.getName()); - failedTaskMap.put("任务类型", task.getTaskType()); - failedTaskMap.put("任务状态", task.getState().toString()); - failedTaskMap.put("任务开始时间", DateUtils.dateToString(task.getStartTime())); - failedTaskMap.put("任务结束时间", DateUtils.dateToString(task.getEndTime())); + failedTaskMap.put("task id", String.valueOf(task.getId())); + failedTaskMap.put("task name", task.getName()); + failedTaskMap.put("task type", task.getTaskType()); + failedTaskMap.put("task state", task.getState().toString()); + failedTaskMap.put("task start time", DateUtils.dateToString(task.getStartTime())); + failedTaskMap.put("task end time", DateUtils.dateToString(task.getEndTime())); failedTaskMap.put("host", task.getHost()); - failedTaskMap.put("日志路径", task.getLogPath()); + failedTaskMap.put("log path", task.getLogPath()); failedTaskList.add(failedTaskMap); } res = JSONUtils.toJson(failedTaskList); @@ -152,10 +153,10 @@ public class AlertManager { for(TaskInstance taskInstance: toleranceTaskList){ LinkedHashMap toleranceWorkerContentMap = new LinkedHashMap(); - toleranceWorkerContentMap.put("工作流程名称", processInstance.getName()); - toleranceWorkerContentMap.put("容错任务名称", taskInstance.getName()); - toleranceWorkerContentMap.put("容错机器IP", taskInstance.getHost()); - toleranceWorkerContentMap.put("任务失败次数", String.valueOf(taskInstance.getRetryTimes())); + toleranceWorkerContentMap.put("process name", processInstance.getName()); + toleranceWorkerContentMap.put("task name", taskInstance.getName()); + toleranceWorkerContentMap.put("host", taskInstance.getHost()); + toleranceWorkerContentMap.put("task retry times", String.valueOf(taskInstance.getRetryTimes())); toleranceTaskInstanceList.add(toleranceWorkerContentMap); } return JSONUtils.toJson(toleranceTaskInstanceList); @@ -166,9 +167,9 @@ public class AlertManager { * @param processInstance * @param toleranceTaskList */ - public void sendWarnningWorkerleranceFault(ProcessInstance processInstance, List toleranceTaskList){ + public void sendAlertWorkerToleranceFault(ProcessInstance processInstance, List toleranceTaskList){ Alert alert = new Alert(); - alert.setTitle("worker容错报警"); + alert.setTitle("worker fault tolerance"); alert.setShowType(ShowType.TABLE); String content = getWorkerToleranceContent(processInstance, toleranceTaskList); alert.setContent(content); @@ -187,8 +188,8 @@ public class AlertManager { * send process instance alert * @param processInstance */ - public void sendWarnningOfProcessInstance(ProcessInstance processInstance, - List taskInstances){ + public void sendAlertProcessInstance(ProcessInstance processInstance, + List taskInstances){ boolean sendWarnning = false; WarningType warningType = processInstance.getWarningType(); @@ -217,7 +218,7 @@ public class AlertManager { String cmdName = getCommandCnName(processInstance.getCommandType()); - String success = processInstance.getState().typeIsSuccess() ? "成功" :"失败"; + String success = processInstance.getState().typeIsSuccess() ? "success" :"failed"; alert.setTitle(cmdName + success); ShowType showType = processInstance.getState().typeIsSuccess() ? ShowType.TEXT : ShowType.TABLE; alert.setShowType(showType); @@ -233,4 +234,7 @@ public class AlertManager { logger.info("add alert to db , alert: {}", alert.toString()); } + public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition) { + alertDao.sendProcessTimeoutAlert(processInstance, processDefinition); + } } diff --git a/escheduler-server/src/test/java/cn/escheduler/server/master/AlertManagerTest.java b/escheduler-server/src/test/java/cn/escheduler/server/master/AlertManagerTest.java index e6881a3afc66f244de4bb62613c8ef67f09eb623..14c46b7d0c24acdfe83899cd3199554f081b911a 100644 --- a/escheduler-server/src/test/java/cn/escheduler/server/master/AlertManagerTest.java +++ b/escheduler-server/src/test/java/cn/escheduler/server/master/AlertManagerTest.java @@ -76,7 +76,7 @@ public class AlertManagerTest { toleranceTaskList.add(toleranceTask1); toleranceTaskList.add(toleranceTask2); - alertManager.sendWarnningWorkerleranceFault(processInstance, toleranceTaskList); + alertManager.sendAlertWorkerToleranceFault(processInstance, toleranceTaskList); } @@ -103,7 +103,7 @@ public class AlertManagerTest { toleranceTaskList.add(toleranceTask1); toleranceTaskList.add(toleranceTask2); - alertManager.sendWarnningOfProcessInstance(processInstance, toleranceTaskList); + alertManager.sendAlertProcessInstance(processInstance, toleranceTaskList); } } diff --git a/sql/upgrade/1.0.2_schema/mysql/escheduler_ddl.sql b/sql/upgrade/1.0.2_schema/mysql/escheduler_ddl.sql index 435a8993a213e1375575e3e1dde31f8292c4eabb..b75c362e0e2b30b88de4da49a86f1696c2e65547 100644 --- a/sql/upgrade/1.0.2_schema/mysql/escheduler_ddl.sql +++ b/sql/upgrade/1.0.2_schema/mysql/escheduler_ddl.sql @@ -200,4 +200,46 @@ d// delimiter ; CALL ac_escheduler_T_t_escheduler_process_instance_C_worker_group_id; -DROP PROCEDURE ac_escheduler_T_t_escheduler_process_instance_C_worker_group_id; \ No newline at end of file +DROP PROCEDURE ac_escheduler_T_t_escheduler_process_instance_C_worker_group_id; + + +-- ac_escheduler_T_t_escheduler_process_instance_C_timeout +drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_process_instance_C_timeout; +delimiter d// +CREATE PROCEDURE ac_escheduler_T_t_escheduler_process_instance_C_timeout() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_escheduler_process_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME='timeout') + THEN + ALTER TABLE `t_escheduler_process_instance` ADD COLUMN `timeout` int(11) NULL DEFAULT 0 COMMENT '超时时间' AFTER `worker_group_id`; + END IF; + END; + +d// + +delimiter ; +CALL ac_escheduler_T_t_escheduler_process_instance_C_timeout; +DROP PROCEDURE ac_escheduler_T_t_escheduler_process_instance_C_timeout; + + +-- ac_escheduler_T_t_escheduler_process_definition_C_timeout +drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_process_definition_C_timeout; +delimiter d// +CREATE PROCEDURE ac_escheduler_T_t_escheduler_process_definition_C_timeout() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_escheduler_process_definition' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME='timeout') + THEN + ALTER TABLE `t_escheduler_process_definition` ADD COLUMN `timeout` int(11) NULL DEFAULT 0 COMMENT '超时时间' AFTER `create_time`; + END IF; + END; + +d// + +delimiter ; +CALL ac_escheduler_T_t_escheduler_process_definition_C_timeout; +DROP PROCEDURE ac_escheduler_T_t_escheduler_process_definition_C_timeout; \ No newline at end of file