未验证 提交 2b519f31 编写于 作者: W wangxj3 提交者: GitHub

[Feature-#6422] task group queue (#6722)

* add task group

* modify task group

* pull dev

* add license header

* modify code style

* fix code style

* fix sql error

* fix error

* fix test

* fix test

* fix test

* fix test

* fix code style

* fix ut

* code style

* fix unit test

* test ut

* ut

* add unittest

* test ut

* modify back ut

* majorization code

* fix conflict

* fix ut

Co-authored-by: wangxj <wangxj31>
上级 f480b8ad
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.CLOSE_TASK_GROUP_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_GROUP_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_LIST_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_QUEUE_LIST_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.START_TASK_GROUP_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_GROUP_ERROR;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.TaskGroupQueueService;
import org.apache.dolphinscheduler.api.service.TaskGroupService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import springfox.documentation.annotations.ApiIgnore;
/**
* task group controller
*/
@Api(tags = "task group")
@RestController
@RequestMapping("/task-group")
public class TaskGroupController extends BaseController {
@Autowired
private TaskGroupService taskGroupService;
/**
* query task group list
*
* @param loginUser login user
* @param name name
* @param description description
* @param groupSize group size
* @param name project id
* @return result and msg code
*/
@ApiOperation(value = "createTaskGroup", notes = "CREATE_TAKS_GROUP_NOTE")
@ApiImplicitParams({
@ApiImplicitParam(name = "name", value = "NAME", dataType = "String"),
@ApiImplicitParam(name = "description", value = "DESCRIPTION", dataType = "String"),
@ApiImplicitParam(name = "groupSize", value = "GROUPSIZE", dataType = "Int"),
})
@PostMapping(value = "/create")
@ResponseStatus(HttpStatus.CREATED)
@ApiException(CREATE_TASK_GROUP_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result createTaskGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("name") String name,
@RequestParam("description") String description,
@RequestParam("groupSize") Integer groupSize) {
Map<String, Object> result = taskGroupService.createTaskGroup(loginUser, name, description, groupSize);
return returnDataList(result);
}
/**
* update task group list
*
* @param loginUser login user
* @param name name
* @param description description
* @param groupSize group size
* @param name project id
* @return result and msg code
*/
@ApiOperation(value = "updateTaskGroup", notes = "UPDATE_TAKS_GROUP_NOTE")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "id", dataType = "Int"),
@ApiImplicitParam(name = "name", value = "NAME", dataType = "String"),
@ApiImplicitParam(name = "description", value = "DESCRIPTION", dataType = "String"),
@ApiImplicitParam(name = "groupSize", value = "GROUPSIZE", dataType = "Int"),
})
@PostMapping(value = "/update")
@ResponseStatus(HttpStatus.CREATED)
@ApiException(UPDATE_TASK_GROUP_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result updateTaskGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("id") Integer id,
@RequestParam("name") String name,
@RequestParam("description") String description,
@RequestParam("groupSize") Integer groupSize) {
Map<String, Object> result = taskGroupService.updateTaskGroup(loginUser, id, name, description, groupSize);
return returnDataList(result);
}
/**
* query task group list paging
*
* @param loginUser login user
* @param pageNo page number
* @param pageSize page size
* @return queue list
*/
@ApiOperation(value = "queryAllTaskGroup", notes = "QUERY_ALL_TASK_GROUP_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20")
})
@GetMapping(value = "/query-list-all")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_GROUP_LIST_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryAllTaskGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Map<String, Object> result = taskGroupService.queryAllTaskGroup(loginUser, pageNo, pageSize);
return returnDataList(result);
}
/**
* query task group list paging
*
* @param loginUser login user
* @param pageNo page number
* @param status status
* @param pageSize page size
* @return queue list
*/
@ApiOperation(value = "queryTaskGroupByStatus", notes = "QUERY_TASK_GROUP_LIST_BY_STSATUS_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20"),
@ApiImplicitParam(name = "status", value = "status", required = true, dataType = "Int")
})
@GetMapping(value = "/query-list-by-status")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_GROUP_LIST_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryTaskGroupByStatus(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("pageNo") Integer pageNo,
@RequestParam(value = "status", required = false) Integer status,
@RequestParam("pageSize") Integer pageSize) {
Map<String, Object> result = taskGroupService.queryTaskGroupByStatus(loginUser, pageNo, pageSize, status);
return returnDataList(result);
}
/**
* query task group list paging by project id
*
* @param loginUser login user
* @param pageNo page number
* @param name project id
* @param pageSize page size
* @return queue list
*/
@ApiOperation(value = "queryTaskGroupByName", notes = "QUERY_TASK_GROUP_LIST_BY_PROJECT_ID_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20"),
@ApiImplicitParam(name = "name", value = "PROJECT_ID", required = true, dataType = "String")
})
@GetMapping(value = "/query-list-by-name")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_GROUP_LIST_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryTaskGroupByName(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("pageNo") Integer pageNo,
@RequestParam(value = "name", required = false) String name,
@RequestParam("pageSize") Integer pageSize) {
Map<String, Object> result = taskGroupService.queryTaskGroupByName(loginUser, pageNo, pageSize, name);
return returnDataList(result);
}
/**
* close a task group
*
* @param loginUser login user
* @param id id
* @return result
*/
@ApiOperation(value = "closeTaskGroup", notes = "CLOSE_TASK_GROUP_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "Int")
})
@PostMapping(value = "/close-task-group")
@ResponseStatus(HttpStatus.CREATED)
@ApiException(CLOSE_TASK_GROUP_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result closeTaskGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "id", required = false) Integer id) {
Map<String, Object> result = taskGroupService.closeTaskGroup(loginUser, id);
return returnDataList(result);
}
/**
* start a task group
*
* @param loginUser login user
* @param id id
* @return result
*/
@ApiOperation(value = "startTaskGroup", notes = "START_TASK_GROUP_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "Int")
})
@PostMapping(value = "/start-task-group")
@ResponseStatus(HttpStatus.CREATED)
@ApiException(START_TASK_GROUP_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result startTaskGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "id", required = false) Integer id) {
Map<String, Object> result = taskGroupService.startTaskGroup(loginUser, id);
return returnDataList(result);
}
/**
* force start task without task group
*
* @param loginUser login user
* @param taskId task id
* @return result
*/
@ApiOperation(value = "wakeCompulsively", notes = "WAKE_TASK_COMPULSIVELY_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskId", value = "TASKID", required = true, dataType = "Int")
})
@PostMapping(value = "/wake-task-compulsively")
@ResponseStatus(HttpStatus.CREATED)
@ApiException(START_TASK_GROUP_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result wakeCompulsively(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "taskId") Integer taskId) {
Map<String, Object> result = taskGroupService.wakeTaskcompulsively(loginUser, taskId);
return returnDataList(result);
}
@Autowired
private TaskGroupQueueService taskGroupQueueService;
/**
* query task group queue list paging
*
* @param loginUser login user
* @param pageNo page number
* @param pageSize page size
* @return queue list
*/
@ApiOperation(value = "queryTasksByGroupId", notes = "QUERY_ALL_TASKS_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "groupId", value = "GROUP_ID", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20")
})
@GetMapping(value = "/query-list-by-group-id")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_GROUP_QUEUE_LIST_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryTasksByGroupId(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("groupId") Integer groupId,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Map<String, Object> result = taskGroupQueueService.queryTasksByGroupId(loginUser, groupId, pageNo, pageSize);
return returnDataList(result);
}
}
......@@ -340,10 +340,26 @@ public enum Status {
QUERY_ENVIRONMENT_BY_CODE_ERROR(1200009, "not found environment [{0}] ", "查询环境编码[{0}]不存在"),
QUERY_ENVIRONMENT_ERROR(1200010, "login user query environment error", "分页查询环境列表错误"),
VERIFY_ENVIRONMENT_ERROR(1200011, "verify environment error", "验证环境信息错误"),
ENVIRONMENT_WORKER_GROUPS_IS_INVALID(1200012, "environment worker groups is invalid format", "环境关联的工作组参数解析错误"),
UPDATE_ENVIRONMENT_WORKER_GROUP_RELATION_ERROR(1200013,"You can't modify the worker group, because the worker group [{0}] and this environment [{1}] already be used in the task [{2}]",
"您不能修改工作组选项,因为该工作组 [{0}] 和 该环境 [{1}] 已经被用在任务 [{2}] 中");
TASK_GROUP_NAME_EXSIT(130001,"this task group name is repeated in a project","该任务组名称在一个项目中已经使用"),
TASK_GROUP_SIZE_ERROR(130002,"task group size error","任务组大小应该为大于1的整数"),
TASK_GROUP_STATUS_ERROR(130003,"task group status error","任务组已经被关闭"),
TASK_GROUP_FULL(130004,"task group is full","任务组已经满了"),
TASK_GROUP_USED_SIZE_ERROR(130005,"the used size number of task group is dirty","任务组使用的容量发生了变化"),
TASK_GROUP_QUEUE_RELEASE_ERROR(130006,"relase task group queue failed","任务组资源释放时出现了错误"),
TASK_GROUP_QUEUE_AWAKE_ERROR(130007,"awake waiting task failed","任务组使唤醒等待任务时发生了错误"),
CREATE_TASK_GROUP_ERROR(130008,"create task group error","创建任务组错误"),
UPDATE_TASK_GROUP_ERROR(130009,"update task group list error","更新任务组错误"),
QUERY_TASK_GROUP_LIST_ERROR(130010,"query task group list error","查询任务组列表错误"),
CLOSE_TASK_GROUP_ERROR(130011,"close task group error","关闭任务组错误"),
START_TASK_GROUP_ERROR(130012,"start task group error","启动任务组错误"),
QUERY_TASK_GROUP_QUEUE_LIST_ERROR(130013,"query task group queue list error","查询任务组队列列表错误"),
TASK_GROUP_CACHE_START_FAILED(130014,"cache start failed","任务组相关的缓存启动失败"),
ENVIRONMENT_WORKER_GROUPS_IS_INVALID(130015, "environment worker groups is invalid format", "环境关联的工作组参数解析错误"),
UPDATE_ENVIRONMENT_WORKER_GROUP_RELATION_ERROR(130016,"You can't modify the worker group, because the worker group [{0}] and this environment [{1}] already be used in the task [{2}]",
"您不能修改工作组选项,因为该工作组 [{0}] 和 该环境 [{1}] 已经被用在任务 [{2}] 中"),
TASK_GROUP_QUEUE_ALREADY_START(130017, "task group queue already start", "节点已经获取任务组资源")
;
private final int code;
private final String enMsg;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.Map;
/**
* task group queue service
*/
public interface TaskGroupQueueService {
/**
* query tasks in task group queue by group id
* @param loginUser login user
* @param groupId group id
* @param pageNo page no
* @param pageSize page size
* @return tasks list
*/
Map<String, Object> queryTasksByGroupId(User loginUser, int groupId, int pageNo,
int pageSize);
/**
* query tasks in task group queue by project id
* @param loginUser login user
* @param pageNo page no
* @param pageSize page size
* @param processId process id
* @return tasks list
*/
Map<String, Object> queryTasksByProcessId(User loginUser, int pageNo,
int pageSize, int processId);
/**
* query all tasks in task group queue
* @param loginUser login user
* @param pageNo page no
* @param pageSize page size
* @return tasks list
*/
Map<String, Object> queryAllTasks(User loginUser, int pageNo, int pageSize);
/**
* delete by task id
* @param taskId task id
* @return TaskGroupQueue entity
*/
boolean deleteByTaskId(int taskId);
void forceStartTask(int taskId,int forceStart);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.Map;
/**
* task group service
*/
public interface TaskGroupService {
/**
* create a Task group
*
* @param loginUser login user
* @param name task group name
* @param description task group description
* @param groupSize task group total size
* @return the result code and msg
*/
Map<String, Object> createTaskGroup(User loginUser, String name,
String description, int groupSize);
/**
* update the task group
*
* @param loginUser login user
* @param name task group name
* @param description task group description
* @param groupSize task group total size
* @return the result code and msg
*/
Map<String, Object> updateTaskGroup(User loginUser, int id, String name,
String description, int groupSize);
/**
* get task group status
*
* @param id task group id
* @return the result code and msg
*/
boolean isTheTaskGroupAvailable(int id);
/**
* query all task group by user id
*
* @param loginUser login user
* @param pageNo page no
* @param pageSize page size
* @return the result code and msg
*/
Map<String, Object> queryAllTaskGroup(User loginUser, int pageNo, int pageSize);
/**
* query all task group by status
*
* @param loginUser login user
* @param pageNo page no
* @param pageSize page size
* @param status status
* @return the result code and msg
*/
Map<String, Object> queryTaskGroupByStatus(User loginUser, int pageNo, int pageSize, int status);
/**
* query all task group by name
*
* @param loginUser login user
* @param pageNo page no
* @param pageSize page size
* @param name name
* @return the result code and msg
*/
Map<String, Object> queryTaskGroupByName(User loginUser, int pageNo, int pageSize, String name);
/**
* query all task group by id
*
* @param loginUser login user
* @param id id
* @return the result code and msg
*/
Map<String, Object> queryTaskGroupById(User loginUser, int id);
/**
* query
*
* @param loginUser login user
* @param pageNo page no
* @param pageSize page size
* @param userId user id
* @param name name
* @param status status
* @return the result code and msg
*/
Map<String, Object> doQuery(User loginUser, int pageNo, int pageSize, int userId, String name, int status);
/**
* close a task group
*
* @param loginUser login user
* @param id task group id
* @return the result code and msg
*/
Map<String, Object> closeTaskGroup(User loginUser, int id);
/**
* start a task group
*
* @param loginUser login user
* @param id task group id
* @return the result code and msg
*/
Map<String, Object> startTaskGroup(User loginUser, int id);
/**
* wake a task manually
*
* @param taskId task id
* @return result
*/
Map<String, Object> wakeTaskcompulsively(User loginUser, int taskId);
}
......@@ -24,7 +24,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ExecutorService;
......@@ -34,10 +33,12 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
......@@ -47,6 +48,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
......@@ -73,6 +75,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.type.TypeReference;
/**
* executor service impl
*/
......@@ -403,6 +407,27 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result;
}
/**
* prepare to update process instance command type and status
*
* @param processInstance process instance
* @return update result
*/
private Map<String, Object> forceStartTaskInstance(ProcessInstance processInstance, int taskId) {
Map<String, Object> result = new HashMap<>();
TaskGroupQueue taskGroupQueue = processService.loadTaskGroupQueue(taskId);
if (taskGroupQueue.getStatus() != TaskGroupQueueStatus.WAIT_QUEUE) {
putMsg(result, Status.TASK_GROUP_QUEUE_ALREADY_START);
return result;
}
taskGroupQueue.setForceStart(Flag.YES.getCode());
processService.updateTaskGroupQueue(taskGroupQueue);
processService.sendStartTask2Master(processInstance,taskId
,org.apache.dolphinscheduler.remote.command.CommandType.TASK_FORCE_STATE_EVENT_REQUEST);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* insert command, used in the implementation of the page, re run, recovery (pause / failure) execution
*
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.TaskGroupQueueService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* task group queue service
*/
@Service
public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGroupQueueService {
@Autowired
TaskGroupQueueMapper taskGroupQueueMapper;
@Autowired
private TaskInstanceMapper taskInstanceMapper;
private static final Logger logger = LoggerFactory.getLogger(TaskGroupQueueServiceImpl.class);
/**
* query tasks in task group queue by group id
*
* @param loginUser login user
* @param groupId group id
* @param pageNo page no
* @param pageSize page size
* @return tasks list
*/
@Override
public Map<String, Object> queryTasksByGroupId(User loginUser, int groupId, int pageNo, int pageSize) {
return this.doQuery(loginUser, pageNo, pageSize, groupId);
}
/**
* query tasks in task group queue by project id
*
* @param loginUser login user
* @param pageNo page no
* @param pageSize page size
* @param processId process id
* @return tasks list
*/
@Override
public Map<String, Object> queryTasksByProcessId(User loginUser, int pageNo, int pageSize, int processId) {
return this.doQuery(loginUser, pageNo, pageSize, processId);
}
/**
* query all tasks in task group queue
*
* @param loginUser login user
* @param pageNo page no
* @param pageSize page size
* @return tasks list
*/
@Override
public Map<String, Object> queryAllTasks(User loginUser, int pageNo, int pageSize) {
return this.doQuery(loginUser, pageNo, pageSize, 0);
}
public Map<String, Object> doQuery(User loginUser, int pageNo, int pageSize,
int groupId) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
Page<TaskGroupQueue> page = new Page<>(pageNo, pageSize);
IPage<TaskGroupQueue> taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueuePaging(page, groupId);
PageInfo<TaskGroupQueue> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotal((int) taskGroupQueue.getTotal());
pageInfo.setTotalList(taskGroupQueue.getRecords());
result.put(Constants.DATA_LIST, pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* delete by task id
*
* @param taskId task id
* @return TaskGroupQueue entity
*/
@Override
public boolean deleteByTaskId(int taskId) {
return taskGroupQueueMapper.deleteByTaskId(taskId) == 1;
}
@Override
public void forceStartTask(int taskId,int forceStart) {
taskGroupQueueMapper.updateForceStart(taskId,forceStart);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.TaskGroupQueueService;
import org.apache.dolphinscheduler.api.service.TaskGroupService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* task Group Service
*/
@Service
public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupService {
@Autowired
private TaskGroupMapper taskGroupMapper;
@Autowired
private TaskGroupQueueService taskGroupQueueService;
@Autowired
private ProcessService processService;
private static final Logger logger = LoggerFactory.getLogger(TaskGroupServiceImpl.class);
/**
* create a Task group
*
* @param loginUser login user
* @param name task group name
* @param description task group description
* @param groupSize task group total size
* @return the result code and msg
*/
@Override
public Map<String, Object> createTaskGroup(User loginUser, String name, String description, int groupSize) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
if (name == null) {
putMsg(result, Status.NAME_NULL);
return result;
}
if (groupSize <= 0) {
putMsg(result, Status.TASK_GROUP_SIZE_ERROR);
return result;
}
TaskGroup taskGroup1 = taskGroupMapper.queryByName(loginUser.getId(), name);
if (taskGroup1 != null) {
putMsg(result, Status.TASK_GROUP_NAME_EXSIT);
return result;
}
TaskGroup taskGroup = new TaskGroup(name, description,
groupSize, loginUser.getId(),Flag.YES.getCode());
int insert = taskGroupMapper.insert(taskGroup);
logger.info("insert result:{}", insert);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* update the task group
*
* @param loginUser login user
* @param name task group name
* @param description task group description
* @param groupSize task group total size
* @return the result code and msg
*/
@Override
public Map<String, Object> updateTaskGroup(User loginUser, int id, String name, String description, int groupSize) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
TaskGroup taskGroup = taskGroupMapper.selectById(id);
if (taskGroup.getStatus() != Flag.YES.getCode()) {
putMsg(result, Status.TASK_GROUP_STATUS_ERROR);
return result;
}
taskGroup.setGroupSize(groupSize);
taskGroup.setDescription(description);
if (StringUtils.isNotEmpty(name)) {
taskGroup.setName(name);
}
int i = taskGroupMapper.updateById(taskGroup);
logger.info("update result:{}", i);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* get task group status
*
* @param id task group id
* @return is the task group available
*/
@Override
public boolean isTheTaskGroupAvailable(int id) {
return taskGroupMapper.selectCountByIdStatus(id,Flag.YES.getCode()) == 1;
}
/**
* query all task group by user id
*
* @param loginUser login user
* @param pageNo page no
* @param pageSize page size
* @return the result code and msg
*/
@Override
public Map<String, Object> queryAllTaskGroup(User loginUser, int pageNo, int pageSize) {
return this.doQuery(loginUser, pageNo, pageSize, loginUser.getId(), null, 0);
}
/**
* query all task group by status
*
* @param loginUser login user
* @param pageNo page no
* @param pageSize page size
* @param status status
* @return the result code and msg
*/
@Override
public Map<String, Object> queryTaskGroupByStatus(User loginUser, int pageNo, int pageSize, int status) {
return this.doQuery(loginUser, pageNo, pageSize, loginUser.getId(), null, status);
}
/**
* query all task group by name
*
* @param loginUser login user
* @param pageNo page no
* @param pageSize page size
* @param name name
* @return the result code and msg
*/
@Override
public Map<String, Object> queryTaskGroupByName(User loginUser, int pageNo, int pageSize, String name) {
return this.doQuery(loginUser, pageNo, pageSize, loginUser.getId(), name, 0);
}
/**
* query all task group by id
*
* @param loginUser login user
* @param id id
* @return the result code and msg
*/
@Override
public Map<String, Object> queryTaskGroupById(User loginUser, int id) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
TaskGroup taskGroup = taskGroupMapper.selectById(id);
result.put(Constants.DATA_LIST, taskGroup);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* query
*
* @param pageNo page no
* @param pageSize page size
* @param userId user id
* @param name name
* @param status status
* @return the result code and msg
*/
@Override
public Map<String, Object> doQuery(User loginUser, int pageNo, int pageSize, int userId, String name, int status) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
Page<TaskGroup> page = new Page<>(pageNo, pageSize);
IPage<TaskGroup> taskGroupPaging = taskGroupMapper.queryTaskGroupPaging(page, userId, name, status);
PageInfo<TaskGroup> pageInfo = new PageInfo<>(pageNo, pageSize);
int total = taskGroupPaging == null ? 0 : (int) taskGroupPaging.getTotal();
List<TaskGroup> list = taskGroupPaging == null ? new ArrayList<TaskGroup>() : taskGroupPaging.getRecords();
pageInfo.setTotal(total);
pageInfo.setTotalList(list);
result.put(Constants.DATA_LIST, pageInfo);
logger.info("select result:{}", taskGroupPaging);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* close a task group
*
* @param loginUser login user
* @param id task group id
* @return the result code and msg
*/
@Override
public Map<String, Object> closeTaskGroup(User loginUser, int id) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
TaskGroup taskGroup = taskGroupMapper.selectById(id);
taskGroup.setStatus(Flag.NO.getCode());
taskGroupMapper.updateById(taskGroup);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* start a task group
*
* @param loginUser login user
* @param id task group id
* @return the result code and msg
*/
@Override
public Map<String, Object> startTaskGroup(User loginUser, int id) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
TaskGroup taskGroup = taskGroupMapper.selectById(id);
if (taskGroup.getStatus() == 1) {
putMsg(result, Status.TASK_GROUP_STATUS_ERROR);
return result;
}
taskGroup.setStatus(1);
taskGroup.setUpdateTime(new Date(System.currentTimeMillis()));
int update = taskGroupMapper.updateById(taskGroup);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* wake a task manually
*
* @param loginUser
* @param taskId task id
* @return result
*/
@Override
public Map<String, Object> wakeTaskcompulsively(User loginUser, int taskId) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
taskGroupQueueService.forceStartTask(taskId,Flag.YES.getCode());
putMsg(result, Status.SUCCESS);
return result;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.controller;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* queue controller test
*/
public class TaskGroupControllerTest extends AbstractControllerTest {
private static Logger logger = LoggerFactory.getLogger(TaskGroupControllerTest.class);
private static final String QUEUE_CREATE_STRING = "queue1";
@Test
public void testQueryListAll() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("pageNo", "2");
paramsMap.add("pageSize", "2");
MvcResult mvcResult = mockMvc.perform(get("/task-group/query-list-all")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertTrue(result != null && result.isSuccess());
logger.info("query list queue return result:{}", mvcResult.getResponse().getContentAsString());
}
@Test
public void testQueryByName() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("pageNo", "1");
paramsMap.add("name", "TGQ");
paramsMap.add("pageSize", "10");
MvcResult mvcResult = mockMvc.perform(get("/task-group/query-list-by-name")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertTrue(result != null && result.isSuccess());
logger.info("query list queue return result:{}", mvcResult.getResponse().getContentAsString());
}
@Test
public void testQueryByStatus() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("pageNo", "1");
paramsMap.add("status", "1");
paramsMap.add("pageSize", "10");
MvcResult mvcResult = mockMvc.perform(get("/task-group/query-list-by-status")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertTrue(result != null && result.isSuccess());
logger.info("query list queue return result:{}", mvcResult.getResponse().getContentAsString());
}
@Test
public void testCreateTaskGroup() throws Exception {
// success
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("name", "TGQ1");
paramsMap.add("description", "this is a task group queue!");
paramsMap.add("groupSize", "10");
MvcResult mvcResult = mockMvc.perform(post("/task-group/create")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isCreated())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertTrue(result != null && result.isSuccess());
logger.info("create queue return result:{}", mvcResult.getResponse().getContentAsString());
// failed
// name exists
paramsMap.clear();
paramsMap.add("name", "TGQ1");
paramsMap.add("description", "this is a task group queue!");
paramsMap.add("groupSize", "10");
MvcResult mvcResult1 = mockMvc.perform(post("/task-group/create")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isCreated())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andReturn();
Result result1 = JSONUtils.parseObject(mvcResult1.getResponse().getContentAsString(), Result.class);
Assert.assertTrue(result1 != null && result1.isFailed());
logger.info("create queue return result:{}", mvcResult1.getResponse().getContentAsString());
}
@Test
public void testUpdateTaskGroup() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("id", "1");
paramsMap.add("name", "TGQ11");
paramsMap.add("description", "this is a task group queue!");
paramsMap.add("groupSize", "10");
MvcResult mvcResult = mockMvc.perform(post("/task-group/update")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isCreated())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString());
Assert.assertTrue(result != null && result.isSuccess());
logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString());
}
@Test
public void testCloseAndStartTaskGroup() throws Exception {
// close
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("id", "1");
MvcResult mvcResult = mockMvc.perform(post("/task-group/close-task-group")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isCreated())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString());
Assert.assertTrue(result != null && result.isSuccess());
logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString());
// start
paramsMap.clear();
paramsMap.add("id", "1");
MvcResult mvcResult1 = mockMvc.perform(post("/task-group/start-task-group")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isCreated())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andReturn();
Result result1 = JSONUtils.parseObject(mvcResult1.getResponse().getContentAsString(), Result.class);
logger.info("update queue return result:{}", mvcResult1.getResponse().getContentAsString());
Assert.assertTrue(result1 != null && result1.isSuccess());
logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString());
}
@Test
public void testWakeCompulsively() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("id", "1");
paramsMap.add("taskId", "1");
MvcResult mvcResult = mockMvc.perform(post("/task-group/wake-task-compulsively")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isCreated())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString());
Assert.assertTrue(result != null && (result.isSuccess() || result.isStatus(Status.TASK_GROUP_CACHE_START_FAILED)));
logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.controller;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* queue controller test
*/
public class TaskGroupQueueControllerTest extends AbstractControllerTest {
private static Logger logger = LoggerFactory.getLogger(TaskGroupQueueControllerTest.class);
private static final String QUEUE_CREATE_STRING = "queue1";
@Test
public void queryTasksByGroupId() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("groupId", "1");
paramsMap.add("pageNo", "1");
paramsMap.add("pageSize", "10");
MvcResult mvcResult = mockMvc.perform(get("/task-group/query-list-by-group-id")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertTrue(result != null && result.isSuccess());
logger.info("query list queue return result:{}", mvcResult.getResponse().getContentAsString());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.service.impl.TaskGroupQueueServiceImpl;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* project service test
**/
@RunWith(MockitoJUnitRunner.class)
public class TaskGroupQueueServiceTest {
private static final Logger logger = LoggerFactory.getLogger(TaskGroupQueueServiceTest.class);
@InjectMocks
private TaskGroupQueueServiceImpl taskGroupQueueService;
@Mock
private TaskGroupQueueMapper taskGroupQueueMapper;
private String userName = "test";
private String taskName = "taskGroupQueueServiceTest";
/**
* create admin user
*/
private User getLoginUser() {
User loginUser = new User();
loginUser.setUserType(UserType.ADMIN_USER);
loginUser.setUserName(userName);
loginUser.setId(1);
return loginUser;
}
private TaskGroupQueue getTaskGroupQueue() {
TaskGroupQueue taskGroupQueue = new TaskGroupQueue();
taskGroupQueue.setTaskName(taskName);
taskGroupQueue.setId(1);
taskGroupQueue.setGroupId(1);
taskGroupQueue.setTaskId(1);
taskGroupQueue.setPriority(1);
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
Date date = new Date(System.currentTimeMillis());
taskGroupQueue.setUpdateTime(date);
taskGroupQueue.setCreateTime(date);
return taskGroupQueue;
}
@Test
public void testDoQuery() {
User user = getLoginUser();
IPage<TaskGroupQueue> page = new Page<>(1, 10);
page.setTotal(1L);
List<TaskGroupQueue> records = new ArrayList<>();
records.add(getTaskGroupQueue());
page.setRecords(records);
Mockito.when(taskGroupQueueMapper.queryTaskGroupQueuePaging(Mockito.any(Page.class), Mockito.eq(10))).thenReturn(page);
Map<String, Object> result = taskGroupQueueService.doQuery(user, 1, 1, 10);
PageInfo<TaskGroupQueue> pageInfo = (PageInfo<TaskGroupQueue>) result.get(Constants.DATA_LIST);
Assert.assertTrue(CollectionUtils.isNotEmpty(pageInfo.getTotalList()));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.TaskGroupServiceImpl;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* project service test
**/
@RunWith(MockitoJUnitRunner.Silent.class)
public class TaskGroupServiceTest {
private static final Logger logger = LoggerFactory.getLogger(TaskGroupServiceTest.class);
@InjectMocks
private TaskGroupServiceImpl taskGroupService;
@Mock
private TaskGroupQueueService taskGroupQueueService;
@Mock
private ProcessService processService;
@Mock
private TaskGroupMapper taskGroupMapper;
@Mock
private TaskGroupQueueMapper taskGroupQueueMapper;
@Mock
private UserMapper userMapper;
private String taskGroupName = "TaskGroupServiceTest";
private String taskGroupDesc = "this is a task group";
private String userName = "taskGroupServiceTest";
/**
* create admin user
*/
private User getLoginUser() {
User loginUser = new User();
loginUser.setUserType(UserType.ADMIN_USER);
loginUser.setUserName(userName);
loginUser.setId(1);
return loginUser;
}
private TaskGroup getTaskGroup() {
TaskGroup taskGroup = new TaskGroup(taskGroupName, taskGroupDesc,
100, 1,1);
return taskGroup;
}
private List<TaskGroup> getList() {
List<TaskGroup> list = new ArrayList<>();
list.add(getTaskGroup());
return list;
}
@Test
public void testCreate() {
User loginUser = getLoginUser();
TaskGroup taskGroup = getTaskGroup();
Mockito.when(taskGroupMapper.insert(taskGroup)).thenReturn(1);
Mockito.when(taskGroupMapper.queryByName(loginUser.getId(), taskGroupName)).thenReturn(null);
Map<String, Object> result = taskGroupService.createTaskGroup(loginUser, taskGroupName, taskGroupDesc, 100);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@Test
public void testQueryById() {
User loginUser = getLoginUser();
TaskGroup taskGroup = getTaskGroup();
Mockito.when(taskGroupMapper.selectById(1)).thenReturn(taskGroup);
Map<String, Object> result = taskGroupService.queryTaskGroupById(loginUser, 1);
Assert.assertNotNull(result.get(Constants.DATA_LIST));
}
@Test
public void testQueryProjectListPaging() {
IPage<TaskGroup> page = new Page<>(1, 10);
page.setRecords(getList());
User loginUser = getLoginUser();
Mockito.when(taskGroupMapper.queryTaskGroupPaging(Mockito.any(Page.class), Mockito.eq(10),
Mockito.eq(null), Mockito.eq(0))).thenReturn(page);
// query all
Map<String, Object> result = taskGroupService.queryAllTaskGroup(loginUser, 1, 10);
PageInfo<TaskGroup> pageInfo = (PageInfo<TaskGroup>) result.get(Constants.DATA_LIST);
Assert.assertNotNull(pageInfo.getTotalList());
}
@Test
public void testUpdate() {
User loginUser = getLoginUser();
TaskGroup taskGroup = getTaskGroup();
taskGroup.setStatus(Flag.YES.getCode());
// Task group status error
Mockito.when(taskGroupMapper.selectById(1)).thenReturn(taskGroup);
Map<String, Object> result = taskGroupService.updateTaskGroup(loginUser, 1, "newName", "desc", 100);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
taskGroup.setStatus(0);
}
@Test
public void testCloseAndStart() {
User loginUser = getLoginUser();
TaskGroup taskGroup = getTaskGroup();
Mockito.when(taskGroupMapper.selectById(1)).thenReturn(taskGroup);
//close failed
Map<String, Object> result1 = taskGroupService.closeTaskGroup(loginUser, 1);
Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
}
@Test
public void testWakeTaskFroceManually() {
TreeMap<Integer, Integer> tm = new TreeMap<>();
tm.put(1, 1);
Map<String, Object> map1 = taskGroupService.wakeTaskcompulsively(getLoginUser(), 1);
Assert.assertEquals(Status.SUCCESS, map1.get(Constants.STATUS));
}
}
......@@ -24,7 +24,9 @@ public enum StateEventType {
PROCESS_STATE_CHANGE(0, "process statechange"),
TASK_STATE_CHANGE(1, "task state change"),
PROCESS_TIMEOUT(2, "process timeout"),
TASK_TIMEOUT(3, "task timeout");
TASK_TIMEOUT(3, "task timeout"),
WAIT_TASK_GROUP(4, "wait task group"),
;
StateEventType(int code, String descp) {
this.code = code;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
import java.util.HashMap;
import com.baomidou.mybatisplus.annotation.EnumValue;
/**
* running status for task group queue
*/
public enum TaskGroupQueueStatus {
WAIT_QUEUE(-1, "wait queue"),
ACQUIRE_SUCCESS(1, "acquire success"),
RELEASE(2, "release");
@EnumValue
private final int code;
private final String descp;
private static HashMap<Integer, TaskGroupQueueStatus> STATUS_MAP = new HashMap<>();
static {
for (TaskGroupQueueStatus taskGroupQueueStatus : TaskGroupQueueStatus.values()) {
STATUS_MAP.put(taskGroupQueueStatus.code, taskGroupQueueStatus);
}
}
TaskGroupQueueStatus(int code, String descp) {
this.code = code;
this.descp = descp;
}
public static TaskGroupQueueStatus of(int status) {
if (STATUS_MAP.containsKey(status)) {
return STATUS_MAP.get(status);
}
throw new IllegalArgumentException("invalid status : " + status);
}
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}
......@@ -195,6 +195,11 @@ public class TaskDefinition {
@TableField(exist = false)
private String modifyBy;
/**
* task group id
*/
private int taskGroupId;
public TaskDefinition() {
}
......@@ -203,6 +208,14 @@ public class TaskDefinition {
this.version = version;
}
public int getTaskGroupId() {
return taskGroupId;
}
public void setTaskGroupId(int taskGroupId) {
this.taskGroupId = taskGroupId;
}
public String getName() {
return name;
}
......@@ -442,51 +455,51 @@ public class TaskDefinition {
}
TaskDefinition that = (TaskDefinition) o;
return failRetryTimes == that.failRetryTimes
&& failRetryInterval == that.failRetryInterval
&& timeout == that.timeout
&& delayTime == that.delayTime
&& Objects.equals(name, that.name)
&& Objects.equals(description, that.description)
&& Objects.equals(taskType, that.taskType)
&& Objects.equals(taskParams, that.taskParams)
&& flag == that.flag
&& taskPriority == that.taskPriority
&& Objects.equals(workerGroup, that.workerGroup)
&& timeoutFlag == that.timeoutFlag
&& timeoutNotifyStrategy == that.timeoutNotifyStrategy
&& Objects.equals(resourceIds, that.resourceIds)
&& environmentCode == that.environmentCode;
&& failRetryInterval == that.failRetryInterval
&& timeout == that.timeout
&& delayTime == that.delayTime
&& Objects.equals(name, that.name)
&& Objects.equals(description, that.description)
&& Objects.equals(taskType, that.taskType)
&& Objects.equals(taskParams, that.taskParams)
&& flag == that.flag
&& taskPriority == that.taskPriority
&& Objects.equals(workerGroup, that.workerGroup)
&& timeoutFlag == that.timeoutFlag
&& timeoutNotifyStrategy == that.timeoutNotifyStrategy
&& Objects.equals(resourceIds, that.resourceIds)
&& environmentCode == that.environmentCode;
}
@Override
public String toString() {
return "TaskDefinition{"
+ "id=" + id
+ ", code=" + code
+ ", name='" + name + '\''
+ ", version=" + version
+ ", description='" + description + '\''
+ ", projectCode=" + projectCode
+ ", userId=" + userId
+ ", taskType=" + taskType
+ ", taskParams='" + taskParams + '\''
+ ", taskParamList=" + taskParamList
+ ", taskParamMap=" + taskParamMap
+ ", flag=" + flag
+ ", taskPriority=" + taskPriority
+ ", userName='" + userName + '\''
+ ", projectName='" + projectName + '\''
+ ", workerGroup='" + workerGroup + '\''
+ ", failRetryTimes=" + failRetryTimes
+ ", environmentCode='" + environmentCode + '\''
+ ", failRetryInterval=" + failRetryInterval
+ ", timeoutFlag=" + timeoutFlag
+ ", timeoutNotifyStrategy=" + timeoutNotifyStrategy
+ ", timeout=" + timeout
+ ", delayTime=" + delayTime
+ ", resourceIds='" + resourceIds + '\''
+ ", createTime=" + createTime
+ ", updateTime=" + updateTime
+ '}';
+ "id=" + id
+ ", code=" + code
+ ", name='" + name + '\''
+ ", version=" + version
+ ", description='" + description + '\''
+ ", projectCode=" + projectCode
+ ", userId=" + userId
+ ", taskType=" + taskType
+ ", taskParams='" + taskParams + '\''
+ ", taskParamList=" + taskParamList
+ ", taskParamMap=" + taskParamMap
+ ", flag=" + flag
+ ", taskPriority=" + taskPriority
+ ", userName='" + userName + '\''
+ ", projectName='" + projectName + '\''
+ ", workerGroup='" + workerGroup + '\''
+ ", failRetryTimes=" + failRetryTimes
+ ", environmentCode='" + environmentCode + '\''
+ ", failRetryInterval=" + failRetryInterval
+ ", timeoutFlag=" + timeoutFlag
+ ", timeoutNotifyStrategy=" + timeoutNotifyStrategy
+ ", timeout=" + timeout
+ ", delayTime=" + delayTime
+ ", resourceIds='" + resourceIds + '\''
+ ", createTime=" + createTime
+ ", updateTime=" + updateTime
+ '}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
import java.io.Serializable;
import java.util.Date;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
/**
* Task Group
*/
@TableName("t_ds_task_group")
public class TaskGroup implements Serializable {
/**
* key
*/
@TableId(value = "id", type = IdType.AUTO)
private int id;
/**
* task_group name
*/
private String name;
private String description;
/**
* 作业组大小
*/
private int groupSize;
/**
* 已使用作业组大小
*/
private int useSize;
/**
* creator id
*/
private int userId;
/**
* 0 not available, 1 available
*/
private int status;
/**
* create time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date createTime;
/**
* update time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date updateTime;
public TaskGroup(String name, String description, int groupSize, int userId,int status) {
this.name = name;
this.description = description;
this.groupSize = groupSize;
this.userId = userId;
this.status = status;
init();
}
public TaskGroup() {
init();
}
public void init() {
this.status = 1;
this.useSize = 0;
}
@Override
public String toString() {
return "TaskGroup{"
+ "id=" + id
+ ", name='" + name + '\''
+ ", description='" + description + '\''
+ ", groupSize=" + groupSize
+ ", useSize=" + useSize
+ ", userId=" + userId
+ ", status=" + status
+ ", createTime=" + createTime
+ ", updateTime=" + updateTime
+ '}';
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public int getGroupSize() {
return groupSize;
}
public void setGroupSize(int groupSize) {
this.groupSize = groupSize;
}
public int getUseSize() {
return useSize;
}
public void setUseSize(int useSize) {
this.useSize = useSize;
}
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import java.io.Serializable;
import java.util.Date;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
/**
* Task Group Queue
*/
@TableName("t_ds_task_group_queue")
public class TaskGroupQueue implements Serializable {
/**
* key
*/
@TableId(value = "id", type = IdType.AUTO)
private int id;
/**
* taskIntanceid
*/
private int taskId;
/**
* TaskInstance name
*/
private String taskName;
/**
* taskGroup id
*/
private int groupId;
/**
* processInstace id
*/
private int processId;
/**
* the priority of task instance
*/
private int priority;
/**
* is force start
* 0 NO ,1 YES
*/
private int forceStart;
/**
* ready to get the queue by other task finish
* 0 NO ,1 YES
*/
private int inQueue;
/**
* -1: waiting 1: running 2: finished
*/
private TaskGroupQueueStatus status;
/**
* create time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date createTime;
/**
* update time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date updateTime;
public TaskGroupQueue() {
}
public TaskGroupQueue(int taskId, String taskName, int groupId, int processId, int priority, TaskGroupQueueStatus status) {
this.taskId = taskId;
this.taskName = taskName;
this.groupId = groupId;
this.processId = processId;
this.priority = priority;
this.status = status;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public int getGroupId() {
return groupId;
}
public void setGroupId(int groupId) {
this.groupId = groupId;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public int getPriority() {
return priority;
}
public void setPriority(int priority) {
this.priority = priority;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
@Override
public String toString() {
return "TaskGroupQueue{"
+ "id=" + id
+ ", taskId=" + taskId
+ ", taskName='" + taskName + '\''
+ ", groupId=" + groupId
+ ", processId=" + processId
+ ", priority=" + priority
+ ", status=" + status
+ ", createTime=" + createTime
+ ", updateTime=" + updateTime
+ '}';
}
public TaskGroupQueueStatus getStatus() {
return status;
}
public void setStatus(TaskGroupQueueStatus status) {
this.status = status;
}
public int getForceStart() {
return forceStart;
}
public void setForceStart(int forceStart) {
this.forceStart = forceStart;
}
public int getInQueue() {
return inQueue;
}
public void setInQueue(int inQueue) {
this.inQueue = inQueue;
}
}
......@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.dao.entity;
import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
......@@ -39,6 +38,7 @@ import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.core.type.TypeReference;
/**
* task instance
......@@ -268,6 +268,10 @@ public class TaskInstance implements Serializable {
* dry run flag
*/
private int dryRun;
/**
* task group id
*/
private int taskGroupId;
public void init(String host, Date startTime, String executePath) {
this.host = host;
......@@ -283,6 +287,14 @@ public class TaskInstance implements Serializable {
this.varPool = varPool;
}
public int getTaskGroupId() {
return taskGroupId;
}
public void setTaskGroupId(int taskGroupId) {
this.taskGroupId = taskGroupId;
}
public ProcessInstance getProcessInstance() {
return processInstance;
}
......@@ -457,7 +469,8 @@ public class TaskInstance implements Serializable {
public DependentParameters getDependency() {
if (this.dependency == null) {
Map<String, Object> taskParamsMap = JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {});
Map<String, Object> taskParamsMap = JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
this.dependency = JSONUtils.parseObject((String) taskParamsMap.get(Constants.DEPENDENCE), DependentParameters.class);
}
return this.dependency;
......@@ -469,15 +482,17 @@ public class TaskInstance implements Serializable {
public SwitchParameters getSwitchDependency() {
if (this.switchDependency == null) {
Map<String, Object> taskParamsMap = JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {});
Map<String, Object> taskParamsMap = JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
this.switchDependency = JSONUtils.parseObject((String) taskParamsMap.get(Constants.SWITCH_RESULT), SwitchParameters.class);
}
return this.switchDependency;
}
public void setSwitchDependency(SwitchParameters switchDependency) {
Map<String, Object> taskParamsMap = JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {});
taskParamsMap.put(Constants.SWITCH_RESULT,JSONUtils.toJsonString(switchDependency));
Map<String, Object> taskParamsMap = JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
taskParamsMap.put(Constants.SWITCH_RESULT, JSONUtils.toJsonString(switchDependency));
this.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.ibatis.annotations.Param;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
/**
* the Dao interfaces of task group
*
* @author yinrui
* @since 2021-08-07
*/
public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
/**
* compard and set to update table of task group
*
* @param id primary key
* @return affected rows
*/
int updateTaskGroupResource(@Param("id") int id, @Param("queueId") int queueId,
@Param("queueStatus") int queueStatus);
/**
* update table of task group
*
* @param id primary key
* @return affected rows
*/
int releaseTaskGroupResource(@Param("id") int id, @Param("useSize") int useSize,
@Param("queueId") int queueId, @Param("queueStatus") int queueStatus);
/**
* select task groups paging
*
* @param page page
* @param userId user id
* @param name name
* @param status status
* @return result page
*/
IPage<TaskGroup> queryTaskGroupPaging(IPage<TaskGroup> page, @Param("userId") int userId,
@Param("name") String name, @Param("status") int status);
/**
* query by task group name
*
* @param userId user id
* @param name name
* @return task group
*/
TaskGroup queryByName(@Param("userId") int userId, @Param("name") String name);
int selectAvailableCountById(@Param("groupId") int groupId);
int selectCountByIdStatus(@Param("id") int id,@Param("status") int status);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
/**
* the Dao interfaces of task group queue
*
* @author yinrui
* @since 2021-08-07
*/
public interface TaskGroupQueueMapper extends BaseMapper<TaskGroupQueue> {
/**
* select task group queues by some conditions
*
* @param page page
* @param groupId group id
* @return task group queue list
*/
IPage<TaskGroupQueue> queryTaskGroupQueuePaging(IPage<TaskGroupQueue> page,
@Param("groupId") int groupId
);
TaskGroupQueue queryByTaskId(@Param("taskId") int taskId);
/**
* query by status
*
* @param status status
* @return result
*/
List<TaskGroupQueue> queryByStatus(@Param("status") int status);
/**
* delete by task id
*
* @param taskId task id
* @return affected rows
*/
int deleteByTaskId(@Param("taskId") int taskId);
/**
* update status by task id
*
* @param taskId task id
* @param status status
* @return
*/
int updateStatusByTaskId(@Param("taskId") int taskId, @Param("status") int status);
List<TaskGroupQueue> queryHighPriorityTasks(@Param("groupId") int groupId, @Param("priority") int priority, @Param("status") int status);
TaskGroupQueue queryTheHighestPriorityTasks(@Param("groupId") int groupId, @Param("status") int status,
@Param("forceStart") int forceStart, @Param("inQueue") int inQueue);
void updateInQueue(@Param("inQueue") int inQueue, @Param("id") int id);
void updateForceStart(@Param("taskId") int taskId, @Param("forceStart") int forceStart);
int updateInQueueLimit1(@Param("oldValue") int oldValue, @Param("newValue") int newValue
, @Param("groupId") int id, @Param("status") int status);
int updateInQueueCAS(@Param("oldValue") int oldValue, @Param("newValue") int newValue, @Param("id") int id);
}
......@@ -73,4 +73,6 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
@Param("startTime") Date startTime,
@Param("endTime") Date endTime
);
List<TaskInstance> loadAllInfosNoRelease(@Param("processInstanceId") int processInstanceId,@Param("status") int status);
}
<?xml version="1.0" encoding="UTF-8" ?>
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
......@@ -16,12 +16,12 @@
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper">
<sql id="baseSql">
id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority,
worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
resource_ids, operator, operate_time, create_time, update_time
resource_ids, operator, operate_time, create_time, update_time,task_group_id
</sql>
<select id="queryMaxVersionForDefinition" resultType="java.lang.Integer">
select max(version)
......@@ -51,7 +51,7 @@
<insert id="batchInsert">
insert into t_ds_task_definition_log (code, name, version, description, project_code, user_id,
task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval,
timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, operator, operate_time, create_time, update_time)
timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, operator, operate_time, create_time, update_time,task_group_id)
values
<foreach collection="taskDefinitionLogs" item="taskDefinitionLog" separator=",">
(#{taskDefinitionLog.code},#{taskDefinitionLog.name},#{taskDefinitionLog.version},#{taskDefinitionLog.description},
......@@ -59,7 +59,7 @@
#{taskDefinitionLog.flag},#{taskDefinitionLog.taskPriority},#{taskDefinitionLog.workerGroup},#{taskDefinitionLog.environmentCode},
#{taskDefinitionLog.failRetryTimes},#{taskDefinitionLog.failRetryInterval},#{taskDefinitionLog.timeoutFlag},#{taskDefinitionLog.timeoutNotifyStrategy},
#{taskDefinitionLog.timeout},#{taskDefinitionLog.delayTime},#{taskDefinitionLog.resourceIds},#{taskDefinitionLog.operator},#{taskDefinitionLog.operateTime},
#{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime})
#{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime}, #{taskDefinitionLog.taskGroupId})
</foreach>
</insert>
<delete id="deleteByCodeAndVersion">
......
<?xml version="1.0" encoding="UTF-8" ?>
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
......@@ -16,12 +16,12 @@
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper">
<sql id="baseSql">
id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority,
worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
resource_ids, create_time, update_time
resource_ids, create_time, update_time, task_group_id
</sql>
<select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select
......@@ -76,14 +76,14 @@
<insert id="batchInsert">
insert into t_ds_task_definition (code, name, version, description, project_code, user_id,
task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval,
timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, create_time, update_time)
timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, create_time, update_time,task_group_id)
values
<foreach collection="taskDefinitions" item="taskDefinition" separator=",">
(#{taskDefinition.code},#{taskDefinition.name},#{taskDefinition.version},#{taskDefinition.description},
#{taskDefinition.projectCode},#{taskDefinition.userId},#{taskDefinition.taskType},#{taskDefinition.taskParams},#{taskDefinition.flag},
#{taskDefinition.taskPriority},#{taskDefinition.workerGroup},#{taskDefinition.environmentCode},#{taskDefinition.failRetryTimes},
#{taskDefinition.failRetryInterval},#{taskDefinition.timeoutFlag},#{taskDefinition.timeoutNotifyStrategy},#{taskDefinition.timeout},
#{taskDefinition.delayTime},#{taskDefinition.resourceIds},#{taskDefinition.createTime},#{taskDefinition.updateTime})
#{taskDefinition.delayTime},#{taskDefinition.resourceIds},#{taskDefinition.createTime},#{taskDefinition.updateTime}, #{taskDefinition.taskGroupId})
</foreach>
</insert>
<select id="queryDefineListPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper">
<resultMap type="org.apache.dolphinscheduler.dao.entity.TaskGroup" id="TaskGroupMap">
<result property="id" column="id" jdbcType="INTEGER"/>
<result property="name" column="name" jdbcType="VARCHAR"/>
<result property="description" column="description" jdbcType="VARCHAR"/>
<result property="groupSize" column="group_size" jdbcType="INTEGER"/>
<result property="useSize" column="use_size" jdbcType="INTEGER"/>
<result property="userId" column="user_id" jdbcType="INTEGER"/>
<result property="status" column="status" jdbcType="INTEGER"/>
<result property="createTime" column="create_time" jdbcType="TIMESTAMP"/>
<result property="updateTime" column="update_time" jdbcType="TIMESTAMP"/>
</resultMap>
<sql id = "baseSql">
id,name,description,group_size,use_size,create_time,update_time
</sql>
<select id="queryTaskGroupPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroup">
select
<include refid="baseSql">
</include>
from t_ds_task_group
<where>
<if test="userId != 0">
and user_id = #{userId}
</if>
<if test="status != 0">
and status = #{status}
</if>
<if test="name != null and name != '' ">
and name like concat('%', #{name}, '%')
</if>
</where>
</select>
<!--modify data by id-->
<update id="updateTaskGroupResource">
update t_ds_task_group
set use_size = use_size+1
where id = #{id} and use_size &lt; group_size and
(select count(1) FROM t_ds_task_group_queue where id = #{queueId} and status = #{queueStatus} ) = 1
</update>
<!--modify data by id-->
<update id="releaseTaskGroupResource">
update t_ds_task_group
set use_size = use_size-1
where id = #{id} and use_size > 0 and
(select count(1) FROM t_ds_task_group_queue where id = #{queueId} and status = #{queueStatus} ) = 1
</update>
<select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroup">
select
<include refid="baseSql" />
from t_ds_task_group
where
user_id = #{userId} and name = #{name}
</select>
<select id="selectAvailableCountById" resultType="java.lang.Integer">
select
count(1)
from t_ds_task_group
where
id = #{groupId} and use_size &lt; group_size
</select>
<select id="selectCountByIdStatus" resultType="java.lang.Integer">
select
count(1)
from t_ds_task_group
where
id = #{id} and status = #{status}
</select>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper">
<resultMap type="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue" id="TaskGroupQueueMap">
<result property="id" column="id" jdbcType="INTEGER"/>
<result property="taskId" column="task_id" jdbcType="INTEGER"/>
<result property="taskName" column="task_name" jdbcType="VARCHAR"/>
<result property="groupId" column="group_id" jdbcType="INTEGER"/>
<result property="processId" column="process_id" jdbcType="INTEGER"/>
<result property="priority" column="priority" jdbcType="INTEGER"/>
<result property="status" column="status" jdbcType="INTEGER"/>
<result property="forceStart" column="force_start" jdbcType="INTEGER"/>
<result property="inQueue" column="in_queue" jdbcType="INTEGER"/>
<result property="createTime" column="create_time" jdbcType="TIMESTAMP"/>
<result property="updateTime" column="update_time" jdbcType="TIMESTAMP"/>
</resultMap>
<sql id = "baseSql">
id, task_id, task_name, group_id, process_id, priority, status , force_start , in_queue, create_time, update_time
</sql>
<select id="queryTaskGroupQueuePaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
select
<include refid="baseSql">
</include>
from t_ds_task_group_queue
<where>
<if test="groupId != 0">
and group_id = #{groupId}
</if>
</where>
</select>
<select id="queryByStatus" resultMap="TaskGroupQueueMap" resultType="map">
select
<include refid="baseSql">
</include>
from t_ds_task_group_queue
where status = #{status}
</select>
<delete id="deleteByTaskId">
delete from t_ds_task_group_queue
where task_id = #{taskId}
</delete>
<update id="updateStatusByTaskId">
update t_ds_task_group_queue
<set>
<if test="status != 0">
status = #{status},
</if>
</set>
where task_id = #{taskId}
</update>
<update id="updateInQueue">
update t_ds_task_group_queue
set in_queue = #{inQueue}
where id = #{id}
</update>
<update id="updateForceStart">
update t_ds_task_group_queue
set force_start = #{forceStart}
where task_id = #{taskId}
</update>
<update id="updateInQueueLimit1">
update t_ds_task_group_queue
set in_queue = #{newValue}
where group_id = #{groupId} and in_queue = #{oldValue} and status = #{status} order by priority desc limit 1
</update>
<update id="updateInQueueCAS">
update t_ds_task_group_queue
set in_queue = #{newValue}
where id = #{id} and in_queue = #{oldValue}
</update>
<select id="queryHighPriorityTasks" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
select
<include refid="baseSql" />
from t_ds_task_group_queue
where group_id = #{groupId} and status = #{status} and priority &gt; #{priority}
</select>
<select id="queryTheHighestPriorityTasks" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
select
<include refid="baseSql" />
from t_ds_task_group_queue
where priority = (select max(priority) from t_ds_task_group_queue where group_id = #{groupId}
and status = #{status} and in_queue = #{inQueue} and force_start = #{forceStart} ) and group_id = #{groupId}
and status = #{status} and in_queue = #{inQueue} and force_start = #{forceStart} limit 1
</select>
<select id="queryByTaskId" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
select
<include refid="baseSql" />
from t_ds_task_group_queue
where task_id = #{taskId}
</select>
</mapper>
<?xml version="1.0" encoding="UTF-8" ?>
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
......@@ -16,19 +16,19 @@
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper">
<sql id="baseSql">
id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time,
start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link,
flag, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id,
first_submit_time, delay_time, task_params, var_pool, dry_run
first_submit_time, delay_time, task_params, var_pool, dry_run, task_group_id
</sql>
<sql id="baseSqlV2">
${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time,
${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link,
${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id,
${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run
${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run, ${alias}.task_group_id
</sql>
<update id="setFailoverByHostAndStateArray">
update t_ds_task_instance
......@@ -162,4 +162,14 @@
</if>
order by instance.start_time desc
</select>
<select id="loadAllInfosNoRelease" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSqlV2">
<property name="alias" value="instance"/>
</include>
from t_ds_task_instance instance
left join t_ds_task_group_queue que on instance.id = que.task_id
where instance.process_instance_id = #{processInstanceId}
and que.status = #{status}
</select>
</mapper>
......@@ -477,6 +477,7 @@ CREATE TABLE t_ds_task_definition
timeout_notify_strategy tinyint(4) DEFAULT NULL,
timeout int(11) DEFAULT '0',
delay_time int(11) DEFAULT '0',
task_group_id int(11) DEFAULT NULL,
resource_ids text,
create_time datetime NOT NULL,
update_time datetime DEFAULT NULL,
......@@ -510,6 +511,7 @@ CREATE TABLE t_ds_task_definition_log
delay_time int(11) DEFAULT '0',
resource_ids text,
operator int(11) DEFAULT NULL,
task_group_id int(11) DEFAULT NULL,
operate_time datetime DEFAULT NULL,
create_time datetime NOT NULL,
update_time datetime DEFAULT NULL,
......@@ -844,6 +846,7 @@ CREATE TABLE t_ds_task_instance
executor_id int(11) DEFAULT NULL,
first_submit_time datetime DEFAULT NULL,
delay_time int(4) DEFAULT '0',
task_group_id int(11) DEFAULT NULL,
var_pool longtext,
dry_run int NULL DEFAULT 0,
PRIMARY KEY (id),
......@@ -1040,3 +1043,35 @@ CREATE TABLE t_ds_environment_worker_group_relation
PRIMARY KEY (id),
UNIQUE KEY environment_worker_group_unique (environment_code,worker_group)
);
DROP TABLE IF EXISTS t_ds_task_group_queue;
CREATE TABLE t_ds_task_group_queue
(
id int(11) NOT NULL AUTO_INCREMENT ,
task_id int(11) DEFAULT NULL ,
task_name VARCHAR(100) DEFAULT NULL ,
group_id int(11) DEFAULT NULL ,
process_id int(11) DEFAULT NULL ,
priority int(8) DEFAULT '0' ,
status int(4) DEFAULT '-1' ,
force_start int(4) DEFAULT '0' ,
in_queue int(4) DEFAULT '0' ,
create_time datetime DEFAULT NULL ,
update_time datetime DEFAULT NULL ,
PRIMARY KEY (id)
);
DROP TABLE IF EXISTS t_ds_task_group;
CREATE TABLE t_ds_task_group
(
id int(11) NOT NULL AUTO_INCREMENT ,
name varchar(100) DEFAULT NULL ,
description varchar(200) DEFAULT NULL ,
group_size int(11) NOT NULL ,
use_size int(11) DEFAULT '0' ,
user_id int(11) DEFAULT NULL ,
project_id int(11) DEFAULT NULL ,
status int(4) DEFAULT '1' ,
create_time datetime DEFAULT NULL ,
update_time datetime DEFAULT NULL ,
PRIMARY KEY(id)
);
......@@ -477,6 +477,7 @@ CREATE TABLE `t_ds_task_definition` (
`timeout` int(11) DEFAULT '0' COMMENT 'timeout length,unit: minute',
`delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute',
`resource_ids` text COMMENT 'resource id, separated by comma',
`task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`,`code`)
......@@ -508,6 +509,7 @@ CREATE TABLE `t_ds_task_definition_log` (
`delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute',
`resource_ids` text DEFAULT NULL COMMENT 'resource id, separated by comma',
`operator` int(11) DEFAULT NULL COMMENT 'operator user id',
`task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
`operate_time` datetime DEFAULT NULL COMMENT 'operate time',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
......@@ -832,6 +834,7 @@ CREATE TABLE `t_ds_task_instance` (
`first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time',
`delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time',
`var_pool` longtext COMMENT 'var_pool',
`task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
`dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag: 0 normal, 1 dry run',
PRIMARY KEY (`id`),
KEY `process_instance_id` (`process_instance_id`) USING BTREE,
......@@ -1017,3 +1020,40 @@ CREATE TABLE `t_ds_environment_worker_group_relation` (
PRIMARY KEY (`id`),
UNIQUE KEY `environment_worker_group_unique` (`environment_code`,`worker_group`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for t_ds_task_group_queue
-- ----------------------------
DROP TABLE IF EXISTS `t_ds_task_group_queue`;
CREATE TABLE `t_ds_task_group_queue` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT'key',
`task_id` int(11) DEFAULT NULL COMMENT 'taskintanceid',
`task_name` varchar(100) DEFAULT NULL COMMENT 'TaskInstance name',
`group_id` int(11) DEFAULT NULL COMMENT 'taskGroup id',
`process_id` int(11) DEFAULT NULL COMMENT 'processInstace id',
`priority` int(8) DEFAULT '0' COMMENT 'priority',
`status` tinyint(4) DEFAULT '-1' COMMENT '-1: waiting 1: running 2: finished',
`force_start` tinyint(4) DEFAULT '0' COMMENT 'is force start 0 NO ,1 YES',
`in_queue` tinyint(4) DEFAULT '0' COMMENT 'ready to get the queue by other task finish 0 NO ,1 YES',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY( `id` )
)ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;
-- ----------------------------
-- Table structure for t_ds_task_group
-- ----------------------------
DROP TABLE IF EXISTS `t_ds_task_group`;
CREATE TABLE `t_ds_task_group` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT'key',
`name` varchar(100) DEFAULT NULL COMMENT 'task_group name',
`description` varchar(200) DEFAULT NULL,
`group_size` int (11) NOT NULL COMMENT'group size',
`use_size` int (11) DEFAULT '0' COMMENT 'used size',
`user_id` int(11) DEFAULT NULL COMMENT 'creator id',
`project_id` int(11) DEFAULT NULL COMMENT 'project id',
`status` tinyint(4) DEFAULT '1' COMMENT '0 not available, 1 available',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(`id`)
) ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;
......@@ -385,6 +385,7 @@ CREATE TABLE t_ds_task_definition (
timeout_notify_strategy int DEFAULT NULL ,
timeout int DEFAULT '0' ,
delay_time int DEFAULT '0' ,
task_group_id int DEFAULT NULL,
resource_ids text ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
......@@ -416,6 +417,7 @@ CREATE TABLE t_ds_task_definition_log (
delay_time int DEFAULT '0' ,
resource_ids text ,
operator int DEFAULT NULL ,
task_group_id int DEFAULT NULL,
operate_time timestamp DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
......@@ -713,6 +715,7 @@ CREATE TABLE t_ds_task_instance (
executor_id int DEFAULT NULL ,
first_submit_time timestamp DEFAULT NULL ,
delay_time int DEFAULT '0' ,
task_group_id int DEFAULT NULL,
var_pool text ,
dry_run int DEFAULT '0' ,
PRIMARY KEY (id),
......@@ -989,3 +992,34 @@ CREATE TABLE t_ds_environment_worker_group_relation (
PRIMARY KEY (id) ,
CONSTRAINT environment_worker_group_unique UNIQUE (environment_code,worker_group)
);
DROP TABLE IF EXISTS t_ds_task_group_queue;
CREATE TABLE t_ds_task_group_queue (
id serial NOT NULL,
task_id int DEFAULT NULL ,
task_name VARCHAR(100) DEFAULT NULL ,
group_id int DEFAULT NULL ,
process_id int DEFAULT NULL ,
priority int DEFAULT '0' ,
status int DEFAULT '-1' ,
force_start int DEFAULT '0' ,
in_queue int DEFAULT '0' ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
);
DROP TABLE IF EXISTS t_ds_task_group;
CREATE TABLE t_ds_task_group (
id serial NOT NULL,
name varchar(100) DEFAULT NULL ,
description varchar(200) DEFAULT NULL ,
group_size int NOT NULL ,
use_size int DEFAULT '0' ,
user_id int DEFAULT NULL ,
project_id int DEFAULT NULL ,
status int DEFAULT '1' ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY(id)
);
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import java.util.Date;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
public class TaskGroupMapperTest extends BaseDaoTest {
private static final Logger logger = LoggerFactory.getLogger(TaskGroupMapperTest.class);
@Autowired
TaskGroupMapper taskGroupMapper;
/**
* test insert
*/
public TaskGroup insertOne() {
TaskGroup taskGroup = new TaskGroup();
taskGroup.setName("task group");
taskGroup.setUserId(1);
taskGroup.setStatus(1);
taskGroup.setGroupSize(10);
taskGroup.setDescription("this is a task group");
Date date = new Date(System.currentTimeMillis());
taskGroup.setUpdateTime(date);
taskGroup.setUpdateTime(date);
taskGroupMapper.insert(taskGroup);
return taskGroup;
}
/**
* test update
*/
@Test
public void testUpdate() {
TaskGroup taskGroup = insertOne();
taskGroup.setGroupSize(100);
taskGroup.setUpdateTime(new Date(System.currentTimeMillis()));
int i = taskGroupMapper.updateById(taskGroup);
Assert.assertEquals(i, 1);
}
/**
* test CheckName
*/
@Test
public void testCheckName() {
TaskGroup taskGroup = insertOne();
TaskGroup result = taskGroupMapper.queryByName(taskGroup.getUserId(), taskGroup.getName());
Assert.assertNotNull(result);
}
/**
* test queryTaskGroupPaging
*/
@Test
public void testQueryTaskGroupPaging() {
TaskGroup taskGroup = insertOne();
Page<TaskGroup> page = new Page(1, 3);
IPage<TaskGroup> taskGroupIPage = taskGroupMapper.queryTaskGroupPaging(
page,
taskGroup.getUserId(),
taskGroup.getName(), taskGroup.getStatus());
Assert.assertEquals(taskGroupIPage.getTotal(), 1);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import java.util.Date;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
public class TaskGroupQueueMapperTest extends BaseDaoTest {
@Autowired
TaskGroupQueueMapper taskGroupQueueMapper;
int userId = 1;
public TaskGroupQueue insertOne() {
TaskGroupQueue taskGroupQueue = new TaskGroupQueue();
taskGroupQueue.setTaskName("task1");
taskGroupQueue.setGroupId(10);
taskGroupQueue.setProcessId(11);
taskGroupQueue.setPriority(10);
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
Date date = new Date(System.currentTimeMillis());
taskGroupQueue.setUpdateTime(date);
taskGroupQueue.setUpdateTime(date);
taskGroupQueueMapper.insert(taskGroupQueue);
return taskGroupQueue;
}
/**
* test update
*/
@Test
public void testUpdate() {
TaskGroupQueue taskGroupQueue = insertOne();
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
taskGroupQueue.setUpdateTime(new Date(System.currentTimeMillis()));
int i = taskGroupQueueMapper.updateById(taskGroupQueue);
Assert.assertEquals(i, 1);
}
/**
* test delete
*/
@Test
public void testDelete() {
TaskGroupQueue taskGroupQueue = insertOne();
int i = taskGroupQueueMapper.deleteByTaskId(taskGroupQueue.getId());
Assert.assertEquals(i, 0);
}
/**
* test select
*/
@Test
public void testSelect() {
TaskGroupQueue taskGroupQueue = insertOne();
TaskGroupQueue result = taskGroupQueueMapper.selectById(taskGroupQueue.getId());
Assert.assertEquals(result.getTaskName(), "task1");
List<TaskGroupQueue> taskGroupQueues = taskGroupQueueMapper.queryByStatus(taskGroupQueue.getStatus().getCode());
Assert.assertEquals(taskGroupQueues.size(), 1);
}
@Test
public void testUpdateStatusByTaskId() {
TaskGroupQueue taskGroupQueue = insertOne();
int i = taskGroupQueueMapper.updateStatusByTaskId(taskGroupQueue.getTaskId(), 7);
Assert.assertEquals(i, 1);
}
@Test
public void testDeleteByTaskId() {
TaskGroupQueue taskGroupQueue = insertOne();
int i = taskGroupQueueMapper.deleteByTaskId(taskGroupQueue.getTaskId());
Assert.assertEquals(i, 1);
}
}
......@@ -137,9 +137,16 @@ public enum CommandType {
* state event request
*/
STATE_EVENT_REQUEST,
/**
* cache expire
*/
CACHE_EXPIRE;
CACHE_EXPIRE,
/**
* task state event request
*/
TASK_FORCE_STATE_EVENT_REQUEST,
/**
* task state event request
*/
TASK_WAKEUP_EVENT_REQUEST;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
/**
* db task final result response command
*/
public class TaskEventChangeCommand implements Serializable {
private String key;
private int processInstanceId;
private int taskInstanceId;
public TaskEventChangeCommand() {
super();
}
public TaskEventChangeCommand(
int processInstanceId,
int taskInstanceId
) {
this.key = String.format("%d-%d",
processInstanceId,
taskInstanceId);
this.processInstanceId = processInstanceId;
this.taskInstanceId = taskInstanceId;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
/**
* package response command
*
* @return command
*/
public Command convert2Command(CommandType commandType) {
Command command = new Command();
command.setType(commandType);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskEventChangeCommand{"
+ "key=" + key
+ '}';
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
}
......@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.CacheProcessor;
import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
......@@ -95,6 +96,8 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, new StateEventProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, new TaskEventProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, new TaskEventProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, new CacheProcessor());
this.nettyRemotingServer.start();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**
* handle state event received from master/api
*/
public class TaskEventProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskEventProcessor.class);
private StateEventResponseService stateEventResponseService;
public TaskEventProcessor() {
stateEventResponseService = SpringApplicationContext.getBean(StateEventResponseService.class);
}
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_FORCE_STATE_EVENT_REQUEST == command.getType()
|| CommandType.TASK_WAKEUP_EVENT_REQUEST == command.getType()
, String.format("invalid command type: %s", command.getType()));
TaskEventChangeCommand taskEventChangeCommand = JSONUtils.parseObject(command.getBody(), TaskEventChangeCommand.class);
StateEvent stateEvent = new StateEvent();
stateEvent.setKey(taskEventChangeCommand.getKey());
stateEvent.setProcessInstanceId(taskEventChangeCommand.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskEventChangeCommand.getTaskInstanceId());
stateEvent.setType(StateEventType.WAIT_TASK_GROUP);
logger.info("received command : {}", stateEvent);
stateEventResponseService.addEvent2WorkflowExecute(stateEvent);
}
}
......@@ -149,6 +149,10 @@ public class StateEventResponseService {
}
}
public void addEvent2WorkflowExecute(StateEvent stateEvent) {
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
workflowExecuteThread.addStateEvent(stateEvent);
}
public BlockingQueue<StateEvent> getEventQueue() {
return eventQueue;
}
......
......@@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.graph.DAG;
......@@ -53,6 +54,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
......@@ -316,6 +318,9 @@ public class WorkflowExecuteThread implements Runnable {
case TASK_TIMEOUT:
result = taskTimeout(stateEvent);
break;
case WAIT_TASK_GROUP:
result = checkForceStartAndWakeUp(stateEvent);
break;
default:
break;
}
......@@ -326,6 +331,29 @@ public class WorkflowExecuteThread implements Runnable {
return result;
}
private boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId());
if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
taskProcessor.dispatch(taskInstance, processInstance);
this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
return true;
}
if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
boolean acquireTaskGroup = processService.acquireTaskGroupAgain(taskGroupQueue);
if (acquireTaskGroup) {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
taskProcessor.dispatch(taskInstance, processInstance);
return true;
}
}
return false;
}
private boolean taskTimeout(StateEvent stateEvent) {
if (!checkTaskInstanceByStateEvent(stateEvent)) {
return true;
......@@ -362,8 +390,25 @@ public class WorkflowExecuteThread implements Runnable {
return true;
}
if (task.getState().typeIsFinished()) {
if (task.getState().typeIsFinished() && !completeTaskMap.containsKey(Long.toString(task.getTaskCode()))) {
taskFinished(task);
if (task.getTaskGroupId() > 0) {
//release task group
TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(task);
if (nextTaskInstance != null) {
if (nextTaskInstance.getProcessInstanceId() == task.getProcessInstanceId()) {
StateEvent nextEvent = new StateEvent();
nextEvent.setProcessInstanceId(this.processInstance.getId());
nextEvent.setTaskInstanceId(nextTaskInstance.getId());
nextEvent.setType(StateEventType.WAIT_TASK_GROUP);
this.stateEvents.add(nextEvent);
} else {
ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
this.processService.sendStartTask2Master(processInstance,nextTaskInstance.getId(),
org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
}
}
}
} else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) {
ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
iTaskProcessor.run();
......@@ -615,6 +660,11 @@ public class WorkflowExecuteThread implements Runnable {
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
}
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(processInstance.getId());
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
processAlertManager.sendAlertProcessInstance(processInstance, taskInstances, projectUser);
//release task group
processService.releaseAllTaskGroup(processInstance.getId());
}
public void checkSerialProcess(ProcessDefinition processDefinition) {
......@@ -740,7 +790,10 @@ public class WorkflowExecuteThread implements Runnable {
&& taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);
}
TaskDefinition taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
taskInstance.setTaskGroupId(taskDefinition.getTaskGroupId());
// package task instance before submit
processService.packageTaskInstance(taskInstance, processInstance);
......
......@@ -157,6 +157,11 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return null;
}
@Override
public void dispatch(TaskInstance taskInstance, ProcessInstance processInstance) {
}
/**
* get TaskExecutionContext
*
......
......@@ -39,8 +39,6 @@ import org.apache.commons.lang.StringUtils;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
/**
......@@ -57,7 +55,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
@Autowired
NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) {
this.processInstance = processInstance;
......@@ -67,6 +64,18 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
return false;
}
setTaskExecutionLogger();
int taskGroupId = task.getTaskGroupId();
if (taskGroupId > 0) {
boolean acquireTaskGroup = processService.acquireTaskGroup(task.getId(),
task.getName(),
taskGroupId,
task.getProcessInstanceId(),
task.getTaskInstancePriority().getCode());
if (!acquireTaskGroup) {
logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName());
return true;
}
}
dispatchTask(taskInstance, processInstance);
return true;
}
......@@ -76,6 +85,11 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
return this.taskInstance.getState();
}
@Override
public void dispatch(TaskInstance taskInstance, ProcessInstance processInstance) {
this.dispatchTask(taskInstance,processInstance);
}
@Override
public void run() {
}
......
......@@ -36,4 +36,6 @@ public interface ITaskProcessor {
ExecutionStatus taskState();
void dispatch(TaskInstance taskInstance, ProcessInstance processInstance);
}
......@@ -22,7 +22,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.dolphinscheduler.common.enums.CommandType;
......@@ -113,7 +112,6 @@ public class WorkflowExecuteThreadTest {
PowerMockito.doNothing().when(workflowExecuteThread, "endProcess");
}
@Test
public void testParseStartNodeName() throws ParseException {
try {
......
......@@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
......@@ -74,6 +75,8 @@ import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
......@@ -94,10 +97,13 @@ import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
......@@ -212,6 +218,12 @@ public class ProcessService {
@Autowired
private EnvironmentMapper environmentMapper;
@Autowired
private TaskGroupQueueMapper taskGroupQueueMapper;
@Autowired
private TaskGroupMapper taskGroupMapper;
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
*
......@@ -2504,6 +2516,180 @@ public class ProcessService {
return processTaskMap;
}
/**
* the first time (when submit the task ) get the resource of the task group
* @param taskId task id
* @param taskName
* @param groupId
* @param processId
* @param priority
* @return
*/
public boolean acquireTaskGroup(int taskId,
String taskName, int groupId,
int processId, int priority) {
TaskGroup taskGroup = taskGroupMapper.selectById(groupId);
if (taskGroup == null) {
return true;
}
// if task group is not applicable
if (taskGroup.getStatus() == Flag.NO.getCode()) {
return true;
}
TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId);
if (taskGroupQueue == null) {
taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName, groupId, processId, priority, TaskGroupQueueStatus.WAIT_QUEUE);
} else {
if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) {
return true;
}
taskGroupQueue.setInQueue(Flag.NO.getCode());
taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE);
this.taskGroupQueueMapper.updateById(taskGroupQueue);
}
//check priority
List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority, TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (CollectionUtils.isNotEmpty(highPriorityTasks)) {
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return false;
}
//try to get taskGroup
int count = taskGroupMapper.selectAvailableCountById(groupId);
if (count == 1 && robTaskGroupResouce(taskGroupQueue)) {
return true;
}
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return false;
}
/**
* try to get the task group resource(when other task release the resource)
* @param taskGroupQueue
* @return
*/
public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) {
TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId());
int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(),taskGroupQueue.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (affectedCount > 0) {
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
this.taskGroupQueueMapper.updateById(taskGroupQueue);
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return true;
}
return false;
}
public boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue) {
return robTaskGroupResouce(taskGroupQueue);
}
public void releaseAllTaskGroup(int processInstanceId) {
List<TaskInstance> taskInstances = this.taskInstanceMapper.loadAllInfosNoRelease(processInstanceId, TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
for (TaskInstance info : taskInstances) {
releaseTaskGroup(info);
}
}
/**
* release the TGQ resource when the corresponding task is finished.
*
* @return the result code and msg
*/
public TaskInstance releaseTaskGroup(TaskInstance taskInstance) {
TaskGroup taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
if (taskGroup == null) {
return null;
}
TaskGroupQueue thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
return null;
}
try {
while (taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), taskGroup.getUseSize()
, thisTaskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1) {
thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
return null;
}
taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
}
} catch (Exception e) {
logger.error("release the task group error",e);
}
logger.info("updateTask:{}",taskInstance.getName());
changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE);
TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode());
if (taskGroupQueue == null) {
return null;
}
while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(), Flag.YES.getCode(), taskGroupQueue.getId()) != 1) {
taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode());
if (taskGroupQueue == null) {
return null;
}
}
return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
}
/**
* release the TGQ resource when the corresponding task is finished.
*
* @param taskId task id
* @return the result code and msg
*/
public void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status) {
TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskId);
taskGroupQueue.setStatus(status);
taskGroupQueue.setUpdateTime(new Date(System.currentTimeMillis()));
taskGroupQueueMapper.updateById(taskGroupQueue);
}
/**
* insert into task group queue
*
* @param taskId task id
* @param taskName task name
* @param groupId group id
* @param processId process id
* @param priority priority
* @return result and msg code
*/
public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
String taskName, Integer groupId,
Integer processId, Integer priority, TaskGroupQueueStatus status) {
TaskGroupQueue taskGroupQueue = new TaskGroupQueue(taskId, taskName, groupId, processId, priority, status);
taskGroupQueueMapper.insert(taskGroupQueue);
return taskGroupQueue;
}
public int updateTaskGroupQueueStatus(Integer taskId, int status) {
return taskGroupQueueMapper.updateStatusByTaskId(taskId, status);
}
public int updateTaskGroupQueue(TaskGroupQueue taskGroupQueue) {
return taskGroupQueueMapper.updateById(taskGroupQueue);
}
public TaskGroupQueue loadTaskGroupQueue(int taskId) {
return this.taskGroupQueueMapper.queryByTaskId(taskId);
}
public void sendStartTask2Master(ProcessInstance processInstance,int taskId,
org.apache.dolphinscheduler.remote.command.CommandType taskType) {
String host = processInstance.getHost();
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
TaskEventChangeCommand taskEventChangeCommand = new TaskEventChangeCommand(
processInstance.getId(), taskId
);
stateEventCallbackService.sendResult(address, port, taskEventChangeCommand.convert2Command(taskType));
}
public ProcessInstance loadNextProcess4Serial(long code, int state) {
return this.processInstanceMapper.loadNextProcess4Serial(code, state);
}
......
......@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
......@@ -47,6 +48,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
......@@ -59,6 +61,8 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.service.cache.processor.impl.UserCacheProcessorImpl;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
......@@ -125,6 +129,10 @@ public class ProcessServiceTest {
private ProcessDefinitionLogMapper processDefineLogMapper;
@Mock
private ResourceMapper resourceMapper;
@Mock
private TaskGroupMapper taskGroupMapper;
@Mock
private TaskGroupQueueMapper taskGroupQueueMapper;
private HashMap<String, ProcessDefinition> processDefinitionCacheMaps = new HashMap<>();
......@@ -761,4 +769,41 @@ public class ProcessServiceTest {
}
@Test
public void testCreateTaskGroupQueue() {
Mockito.when(taskGroupQueueMapper.insert(Mockito.any(TaskGroupQueue.class))).thenReturn(1);
TaskGroupQueue taskGroupQueue = processService.insertIntoTaskGroupQueue(1, "task name", 1, 1, 1, TaskGroupQueueStatus.WAIT_QUEUE);
Assert.assertNotNull(taskGroupQueue);
}
@Test
public void testDoRelease() {
TaskGroupQueue taskGroupQueue = getTaskGroupQueue();
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setTaskGroupId(taskGroupQueue.getGroupId());
Mockito.when(taskGroupQueueMapper.queryByTaskId(1)).thenReturn(taskGroupQueue);
Mockito.when(taskGroupQueueMapper.updateById(taskGroupQueue)).thenReturn(1);
processService.releaseTaskGroup(taskInstance);
}
private TaskGroupQueue getTaskGroupQueue() {
TaskGroupQueue taskGroupQueue = new TaskGroupQueue();
taskGroupQueue.setTaskName("task name");
taskGroupQueue.setId(1);
taskGroupQueue.setGroupId(1);
taskGroupQueue.setTaskId(1);
taskGroupQueue.setPriority(1);
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
Date date = new Date(System.currentTimeMillis());
taskGroupQueue.setUpdateTime(date);
taskGroupQueue.setCreateTime(date);
return taskGroupQueue;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册