未验证 提交 5592e7bb 编写于 作者: W wind 提交者: GitHub

[Bug-7788][MasterServer] fix submit duplicate tasks sometimes when retry (#7808) (#7866)

* [Bug-7788] fix submit duplicate tasks sometimes when retry

* add exist check when add task to standby list

* update

* put queue contain judge first
Co-authored-by: Ncaishunfeng <534328519@qq.com>
Co-authored-by: Ncaishunfeng <534328519@qq.com>
上级 1d3f77be
......@@ -76,6 +76,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
......@@ -1171,13 +1172,35 @@ public class WorkflowExecuteThread implements Runnable {
* @param taskInstance task instance
*/
private void addTaskToStandByList(TaskInstance taskInstance) {
logger.info("add task to stand by list, task name: {} , task id:{}", taskInstance.getName(), taskInstance.getId());
try {
if (!readyToSubmitTaskQueue.contains(taskInstance)) {
readyToSubmitTaskQueue.put(taskInstance);
if (readyToSubmitTaskQueue.contains(taskInstance)) {
logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode());
return;
}
// need to check if the tasks with same task code is active
boolean active = false;
Map<Integer, TaskInstance> taskInstanceMap = taskInstanceHashMap.column(taskInstance.getTaskCode());
if (taskInstanceMap != null && taskInstanceMap.size() > 0) {
for (Entry<Integer, TaskInstance> entry : taskInstanceMap.entrySet()) {
Integer taskInstanceId = entry.getKey();
if (activeTaskProcessorMaps.containsKey(taskInstanceId)) {
TaskInstance latestTaskInstance = processService.findTaskInstanceById(taskInstanceId);
if (latestTaskInstance != null && !latestTaskInstance.getState().typeIsFailure()) {
active = true;
break;
}
}
}
}
if (active) {
logger.warn("task was found in active task list, task code:{}", taskInstance.getTaskCode());
return;
}
logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}",
taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode());
readyToSubmitTaskQueue.put(taskInstance);
} catch (Exception e) {
logger.error("add task instance to readyToSubmitTaskQueue, taskName: {}, task id: {}", taskInstance.getName(), taskInstance.getId(), e);
logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册