未验证 提交 0c5754c3 编写于 作者: R RedemptionC 提交者: GitHub

add feature : query top n success process instance order by running duration (#3315)

* add feature : query top n success process instance order by running duration

* add feature : add params check

* add feature : remove code smell

* add feature : remove code smell

* add feature : add param check

* add feature : add param check
improve handleEscape method in ParameterUtils.java

* top n : add  java doc and format code
上级 5fe31546
......@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import io.swagger.annotations.*;
import org.slf4j.Logger;
......@@ -204,6 +205,39 @@ public class ProcessInstanceController extends BaseController {
return returnDataList(result);
}
/**
* query top n process instance order by running duration
*
* @param loginUser login user
* @param projectName project name
* @param size number of process instance
* @param startTime start time
* @param endTime end time
* @return list of process instance
*/
@ApiOperation(value = "queryTopNLongestRunningProcessInstance", notes = "QUERY_TOPN_LONGEST_RUNNING_PROCESS_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "size", value = "PROCESS_INSTANCE_SIZE", dataType = "Int", example = "10"),
@ApiImplicitParam(name = "startTime", value = "PROCESS_INSTANCE_START_TIME", dataType = "String"),
@ApiImplicitParam(name = "endTime", value = "PROCESS_INSTANCE_END_TIME", dataType = "String"),
})
@GetMapping(value = "/top-n")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_PROCESS_INSTANCE_BY_ID_ERROR)
public Result<ProcessInstance> queryTopNLongestRunningProcessInstance(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam("size") Integer size,
@RequestParam(value = "startTime",required = true) String startTime,
@RequestParam(value = "endTime",required = true) String endTime
) {
projectName=ParameterUtils.handleEscapes(projectName);
logger.info("query top {} SUCCESS process instance order by running time whprojectNameich started between {} and {} ,login user:{},project name:{}", size, startTime, endTime,
loginUser.getUserName(), projectName);
Map<String,Object> result=processInstanceService.queryTopNLongestRunningProcessInstance(loginUser, projectName, size, startTime, endTime);
return returnDataList(result);
}
/**
* delete process instance by id, at the same time,
* delete task instance and their mapping relation data
......@@ -220,9 +254,9 @@ public class ProcessInstanceController extends BaseController {
@GetMapping(value = "/delete")
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_PROCESS_INSTANCE_BY_ID_ERROR)
public Result deleteProcessInstanceById(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam("processInstanceId") Integer processInstanceId
public Result<ProcessInstance> deleteProcessInstanceById(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam("processInstanceId") Integer processInstanceId
) {
logger.info("delete process instance by id, login user:{}, project name:{}, process instance id:{}",
loginUser.getUserName(), projectName, processInstanceId);
......
......@@ -249,7 +249,8 @@ public enum Status {
COMMAND_STATE_COUNT_ERROR(80001,"task instance state count error", "查询各状态任务实例数错误"),
NEGTIVE_SIZE_NUMBER_ERROR(80002,"query size number error","查询size错误"),
START_TIME_BIGGER_THAN_END_TIME_ERROR(80003,"start time bigger than end time error","开始时间在结束时间之后错误"),
QUEUE_COUNT_ERROR(90001,"queue count error", "查询队列数据错误"),
KERBEROS_STARTUP_STATE(100001,"get kerberos startup state error", "获取kerberos启动状态错误"),
......
......@@ -98,6 +98,53 @@ public class ProcessInstanceService extends BaseDAGService {
@Autowired
UsersService usersService;
/**
* return top n SUCCESS process instance order by running time which started between startTime and endTime
* @param loginUser
* @param projectName
* @param size
* @param startTime
* @param endTime
* @return
*/
public Map<String, Object> queryTopNLongestRunningProcessInstance(User loginUser, String projectName, int size, String startTime, String endTime) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
if (0 > size) {
putMsg(result, Status.NEGTIVE_SIZE_NUMBER_ERROR, size);
return result;
}
if (Objects.isNull(startTime)) {
putMsg(result, Status.DATA_IS_NULL, Constants.START_TIME);
return result;
}
Date start = DateUtils.stringToDate(startTime);
if (Objects.isNull(endTime)) {
putMsg(result, Status.DATA_IS_NULL, Constants.END_TIME);
return result;
}
Date end = DateUtils.stringToDate(endTime);
if(start == null || end == null) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startDate,endDate");
return result;
}
if (start.getTime() > end.getTime()) {
putMsg(result, Status.START_TIME_BIGGER_THAN_END_TIME_ERROR, startTime, endTime);
return result;
}
List<ProcessInstance> processInstances = processInstanceMapper.queryTopNProcessInstance(size, start, end, ExecutionStatus.SUCCESS);
result.put(DATA_LIST, processInstances);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* query process instance by id
*
......
......@@ -147,6 +147,40 @@ public class ProcessInstanceServiceTest {
}
@Test
public void testQueryTopNLongestRunningProcessInstance() {
String projectName = "project_test1";
User loginUser = getAdminUser();
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.PROJECT_NOT_FOUNT, projectName);
int size=10;
String startTime="2020-01-01 00:00:00";
String endTime="2020-08-02 00:00:00";
Date start = DateUtils.getScheduleDate(startTime);
Date end = DateUtils.getScheduleDate(endTime);
//project auth fail
when(projectMapper.queryByName(projectName)).thenReturn(null);
when(projectService.checkProjectAndAuth(loginUser, null, projectName)).thenReturn(result);
Map<String, Object> proejctAuthFailRes = processInstanceService.queryTopNLongestRunningProcessInstance(loginUser,projectName,size,startTime,endTime);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, proejctAuthFailRes.get(Constants.STATUS));
//project auth success
putMsg(result, Status.SUCCESS, projectName);
Project project = getProject(projectName);
ProcessInstance processInstance = getProcessInstance();
List<ProcessInstance> processInstanceList = new ArrayList<>();
processInstanceList.add(processInstance);
when(projectMapper.queryByName(projectName)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser);
when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId());
when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser);
Map<String, Object> successRes = processInstanceService.queryTopNLongestRunningProcessInstance(loginUser,projectName,size,startTime,endTime);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@Test
public void testQueryProcessInstanceById() {
String projectName = "project_test1";
......
......@@ -978,6 +978,8 @@ public final class Constants {
public static final int NORAML_NODE_STATUS = 0;
public static final int ABNORMAL_NODE_STATUS = 1;
public static final String START_TIME = "start time";
public static final String END_TIME = "end time";
/**
* system line separator
*/
......
......@@ -207,7 +207,7 @@ public class ParameterUtils {
public static String handleEscapes(String inputString){
if(StringUtils.isNotEmpty(inputString)){
return inputString.replace("%", "////%");
return inputString.replace("%", "////%").replaceAll("[\n|\r\t]", "_");
}
return inputString;
}
......
......@@ -193,4 +193,17 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
ProcessInstance queryLastManualProcess(@Param("processDefinitionId") int definitionId,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime);
/**
* query top n process instance order by running duration
* @param size
* @param status process instance status
* @param startTime
* @param endTime
* @return ProcessInstance list
*/
List<ProcessInstance> queryTopNProcessInstance(@Param("size") int size,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
@Param("status")ExecutionStatus status);
}
......@@ -37,6 +37,16 @@
order by id asc
</select>
<select id="queryTopNProcessInstance" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select *
from t_ds_process_instance
where state = #{status}
and start_time between
#{startTime} and #{endTime}
order by end_time-start_time desc
limit #{size}
</select>
<select id="queryByTenantIdAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select *
from t_ds_process_instance
......
......@@ -52,6 +52,25 @@ public class ProcessInstanceMapperTest {
ProjectMapper projectMapper;
/**
* insert process instance with specified start time and end time,set state to SUCCESS
*
* @param startTime
* @param endTime
* @return
*/
private ProcessInstance insertOne(Date startTime, Date endTime) {
ProcessInstance processInstance = new ProcessInstance();
Date start = startTime;
Date end = endTime;
processInstance.setStartTime(start);
processInstance.setEndTime(end);
processInstance.setState(ExecutionStatus.SUCCESS);
processInstanceMapper.insert(processInstance);
return processInstance;
}
/**
* insert
* @return ProcessInstance
......@@ -329,4 +348,50 @@ public class ProcessInstanceMapperTest {
processInstanceMapper.deleteById(processInstance.getId());
}
/**
* test whether it is in descending order by running duration
*
* @param processInstances
* @return
*/
private boolean isSortedByDuration(List<ProcessInstance> processInstances) {
for (int i = 1; i < processInstances.size(); i++) {
long d1 = processInstances.get(i).getEndTime().getTime() - processInstances.get(i).getStartTime().getTime();
long d2 = processInstances.get(i - 1).getEndTime().getTime() - processInstances.get(i - 1).getStartTime().getTime();
if (d1 > d2) {
return false;
}
}
return true;
}
/**
* test query top n process instance order by running duration
*/
@Test
public void testQueryTopNProcessInstance() {
Date startTime1 = new Date(2019, 7, 9, 10, 9, 9);
Date endTime1 = new Date(2019, 7, 9, 10, 9, 14);
Date startTime2 = new Date(2020, 7, 9, 10, 9, 9);
Date endTime2 = new Date(2020, 7, 9, 10, 9, 30);
Date startTime3 = new Date(2020, 6, 9, 10, 9, 9);
Date endTime3 = new Date(2020, 7, 9, 10, 9, 30);
ProcessInstance processInstance1 = insertOne(startTime1, endTime1);
ProcessInstance processInstance2 = insertOne(startTime2, endTime2);
ProcessInstance processInstance3 = insertOne(startTime3, endTime3);
Date start = new Date(2020, 1, 1, 1, 1, 1);
Date end = new Date(2021, 1, 1, 1, 1, 1);
List<ProcessInstance> processInstances = processInstanceMapper.queryTopNProcessInstance(2, start, end,ExecutionStatus.SUCCESS);
Assert.assertEquals(2, processInstances.size());
Assert.assertTrue(isSortedByDuration(processInstances));
for (ProcessInstance processInstance : processInstances) {
Assert.assertTrue(processInstance.getState().typeIsSuccess());
}
processInstanceMapper.deleteById(processInstance1.getId());
processInstanceMapper.deleteById(processInstance2.getId());
processInstanceMapper.deleteById(processInstance3.getId());
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册