未验证 提交 4893bef5 编写于 作者: L longtb 提交者: GitHub

[Improvement][TaskInstance] reduce database queries (#11522)

* [Improvement][TaskInstance] reduce database queries

* Update dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
Co-authored-by: Ncaishunfeng <caishunfeng2021@gmail.com>

* [Improvement][TaskInstance] queryByInstanceIdsAndCodes -> queryByProcessInstanceIdsAndTaskCodes
Co-authored-by: Nzhangshunmin <zhangshunmin@kezaihui.com>
Co-authored-by: Ncaishunfeng <caishunfeng2021@gmail.com>
上级 3f2ca7bc
......@@ -114,7 +114,6 @@ import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
......@@ -722,9 +721,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper
.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
if (taskRelationList.size() == processTaskRelationLogList.size()) {
Set<ProcessTaskRelationLog> taskRelationSet = taskRelationList.stream().collect(Collectors.toSet());
Set<ProcessTaskRelationLog> processTaskRelationLogSet =
processTaskRelationLogList.stream().collect(Collectors.toSet());
Set<ProcessTaskRelationLog> taskRelationSet = new HashSet<>(taskRelationList);
Set<ProcessTaskRelationLog> processTaskRelationLogSet = new HashSet<>(processTaskRelationLogList);
if (taskRelationSet.size() == processTaskRelationLogSet.size()) {
taskRelationSet.removeAll(processTaskRelationLogSet);
if (!taskRelationSet.isEmpty()) {
......@@ -1047,7 +1045,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Override
@Transactional
public Map<String, Object> importProcessDefinition(User loginUser, long projectCode, MultipartFile file) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result;
String dagDataScheduleJson = FileUtils.file2String(file);
List<DagDataSchedule> dagDataScheduleList = JSONUtils.toList(dagDataScheduleJson, DagDataSchedule.class);
Project project = projectMapper.queryByCode(projectCode);
......@@ -1658,7 +1656,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
*/
@Override
public Map<String, Object> viewTree(User loginUser, long projectCode, long code, Integer limit) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result;
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_TREE_VIEW);
......@@ -1716,9 +1714,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
while (!ServerLifeCycleManager.isStopped()) {
Set<String> postNodeList;
Iterator<Map.Entry<String, List<TreeViewDto>>> iter = runningNodeMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, List<TreeViewDto>> en = iter.next();
Set<Map.Entry<String, List<TreeViewDto>>> entries = runningNodeMap.entrySet();
List<Integer> processInstanceIds = processInstanceList.stream()
.limit(limit).map(ProcessInstance::getId).collect(Collectors.toList());
List<Long> nodeCodes = entries.stream().map(e -> Long.parseLong(e.getKey())).collect(Collectors.toList());
List<TaskInstance> taskInstances;
if (processInstanceIds.isEmpty() || nodeCodes.isEmpty()) {
taskInstances = Collections.emptyList();
} else {
taskInstances = taskInstanceMapper.queryByProcessInstanceIdsAndTaskCodes(processInstanceIds, nodeCodes);
}
for (Map.Entry<String, List<TreeViewDto>> en : entries) {
String nodeCode = en.getKey();
parentTreeViewDtoList = en.getValue();
......@@ -1730,8 +1736,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
// set treeViewDto instances
for (int i = limit - 1; i >= 0; i--) {
ProcessInstance processInstance = processInstanceList.get(i);
TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(processInstance.getId(),
Long.parseLong(nodeCode));
TaskInstance taskInstance = null;
for (TaskInstance instance : taskInstances) {
if (instance.getTaskCode() == Long.parseLong(nodeCode)
&& instance.getProcessInstanceId() == processInstance.getId()) {
taskInstance = instance;
break;
}
}
if (taskInstance == null) {
treeViewDto.getInstances().add(new Instance(-1, "not running", 0, "null"));
} else {
......
......@@ -780,24 +780,36 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
ganttDto.setTaskNames(nodeList);
List<Task> taskList = new ArrayList<>();
for (String node : nodeList) {
TaskInstance taskInstance =
taskInstanceMapper.queryByInstanceIdAndCode(processInstanceId, Long.parseLong(node));
if (taskInstance == null) {
continue;
if (!nodeList.isEmpty()) {
List<Long> taskCodes = nodeList.stream().map(Long::parseLong).collect(Collectors.toList());
List<TaskInstance> taskInstances = taskInstanceMapper.queryByProcessInstanceIdsAndTaskCodes(
Collections.singletonList(processInstanceId), taskCodes
);
for (String node : nodeList) {
TaskInstance taskInstance = null;
for (TaskInstance instance : taskInstances) {
if (instance.getProcessInstanceId() == processInstanceId
&& instance.getTaskCode() == Long.parseLong(node)) {
taskInstance = instance;
break;
}
}
if (taskInstance == null) {
continue;
}
Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime();
Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime();
Task task = new Task();
task.setTaskName(taskInstance.getName());
task.getStartDate().add(startTime.getTime());
task.getEndDate().add(endTime.getTime());
task.setIsoStart(startTime);
task.setIsoEnd(endTime);
task.setStatus(taskInstance.getState().toString());
task.setExecutionDate(taskInstance.getStartTime());
task.setDuration(DateUtils.format2Readable(endTime.getTime() - startTime.getTime()));
taskList.add(task);
}
Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime();
Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime();
Task task = new Task();
task.setTaskName(taskInstance.getName());
task.getStartDate().add(startTime.getTime());
task.getEndDate().add(endTime.getTime());
task.setIsoStart(startTime);
task.setIsoEnd(endTime);
task.setStatus(taskInstance.getState().toString());
task.setExecutionDate(taskInstance.getStartTime());
task.setDuration(DateUtils.format2Readable(endTime.getTime() - startTime.getTime()));
taskList.add(task);
}
ganttDto.setTasks(taskList);
......
......@@ -53,6 +53,9 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processInstanceId,
@Param("taskCode") Long taskCode);
List<TaskInstance> queryByProcessInstanceIdsAndTaskCodes(@Param("processInstanceIds") List<Integer> processInstanceIds,
@Param("taskCodes") List<Long> taskCodes);
Integer countTask(@Param("projectCodes") Long[] projectCodes,
@Param("taskIds") int[] taskIds);
......
......@@ -134,6 +134,24 @@
and flag = 1
limit 1
</select>
<select id="queryByProcessInstanceIdsAndTaskCodes" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
from t_ds_task_instance
where flag = 1
<if test="processInstanceIds != null and processInstanceIds.size() != 0">
and process_instance_id in
<foreach collection="processInstanceIds" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
<if test="taskCodes != null and taskCodes.size() != 0">
and task_code in
<foreach collection="taskCodes" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
</select>
<select id="countTask" resultType="java.lang.Integer">
select count(1) as count
from t_ds_task_instance task,t_ds_task_definition_log define
......
......@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.Collections;
import java.util.Date;
import java.util.List;
......@@ -278,6 +279,26 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
Assert.assertNotEquals(taskInstance, null);
}
/**
* test query by process instance ids and task codes
*/
@Test
public void testQueryByProcessInstanceIdsAndTaskCodes() {
// insert ProcessInstance
ProcessInstance processInstance = insertProcessInstance();
// insert taskInstance
TaskInstance task = insertTaskInstance(processInstance.getId());
task.setHost("111.111.11.11");
taskInstanceMapper.updateById(task);
List<TaskInstance> taskInstances = taskInstanceMapper.queryByProcessInstanceIdsAndTaskCodes(
Collections.singletonList(task.getProcessInstanceId()),
Collections.singletonList(task.getTaskCode()));
taskInstanceMapper.deleteById(task.getId());
Assert.assertEquals(taskInstances.size(), 1);
}
/**
* test count task instance
*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册