未验证 提交 bcfc3d93 编写于 作者: journey2018's avatar journey2018 提交者: GitHub

Merge pull request #298 from qiaozhanwei/branch-1.0.2

Delete the process instance and delete the corresponding queue information
......@@ -22,6 +22,8 @@ import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.Result;
import cn.escheduler.common.enums.ExecutionStatus;
import cn.escheduler.common.enums.Flag;
import cn.escheduler.common.queue.ITaskQueue;
import cn.escheduler.common.queue.TaskQueueFactory;
import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.dao.model.User;
import org.slf4j.Logger;
......@@ -189,7 +191,9 @@ public class ProcessInstanceController extends BaseController{
try{
logger.info("delete process instance by id, login user:{}, project name:{}, process instance id:{}",
loginUser.getUserName(), projectName, processInstanceId);
Map<String, Object> result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId);
// task queue
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
Map<String, Object> result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue);
return returnDataList(result);
}catch (Exception e){
logger.error(DELETE_PROCESS_INSTANCE_BY_ID_ERROR.getMsg(),e);
......
......@@ -30,6 +30,8 @@ import cn.escheduler.common.graph.DAG;
import cn.escheduler.common.model.TaskNode;
import cn.escheduler.common.model.TaskNodeRelation;
import cn.escheduler.common.process.Property;
import cn.escheduler.common.queue.ITaskQueue;
import cn.escheduler.common.queue.TaskQueueFactory;
import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.JSONUtils;
......@@ -446,13 +448,13 @@ public class ProcessInstanceService extends BaseDAGService {
/**
* delete process instance by id, at the same time,delete task instance and their mapping relation data
*
* @param loginUser
* @param projectName
* @param workflowId
* @param processInstanceId
* @param tasksQueue
* @return
*/
public Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer workflowId) {
public Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId,ITaskQueue tasksQueue) {
Map<String, Object> result = new HashMap<>(5);
Project project = projectMapper.queryByName(projectName);
......@@ -462,17 +464,34 @@ public class ProcessInstanceService extends BaseDAGService {
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(workflowId);
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processInstanceId);
List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstanceId);
//process instance priority
int processInstancePriority = processInstance.getProcessInstancePriority().ordinal();
if (processInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, workflowId);
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
int delete = processDao.deleteWorkProcessInstanceById(workflowId);
processDao.deleteAllSubWorkProcessByParentId(workflowId);
processDao.deleteWorkProcessMapByParentId(workflowId);
int delete = processDao.deleteWorkProcessInstanceById(processInstanceId);
processDao.deleteAllSubWorkProcessByParentId(processInstanceId);
processDao.deleteWorkProcessMapByParentId(processInstanceId);
if (delete > 0) {
if (CollectionUtils.isNotEmpty(taskInstanceList)){
for (TaskInstance taskInstance : taskInstanceList){
// task instance priority
int taskInstancePriority = taskInstance.getTaskInstancePriority().ordinal();
String nodeValue=processInstancePriority + "_" + processInstanceId + "_" +taskInstancePriority + "_" + taskInstance.getId();
try {
logger.info("delete task queue node : {}",nodeValue);
tasksQueue.removeNode(cn.escheduler.common.Constants.SCHEDULER_TASKS_QUEUE, nodeValue);
}catch (Exception e){
logger.error("delete task queue node : {}", nodeValue);
}
}
}
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR);
......@@ -489,6 +508,8 @@ public class ProcessInstanceService extends BaseDAGService {
* @return
*/
public Map<String, Object> batchDeleteProcessInstanceByIds(User loginUser, String projectName, String processInstanceIds) {
// task queue
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
Map<String, Object> result = new HashMap<>(5);
List<Integer> deleteFailedIdList = new ArrayList<Integer>();
......@@ -507,7 +528,7 @@ public class ProcessInstanceService extends BaseDAGService {
for (String strProcessInstanceId:processInstanceIdArray) {
int processInstanceId = Integer.parseInt(strProcessInstanceId);
try {
deleteProcessInstanceById(loginUser, projectName, processInstanceId);
deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue);
} catch (Exception e) {
deleteFailedIdList.add(processInstanceId);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册