未验证 提交 eb45ff9e 编写于 作者: Q qiaozhanwei 提交者: GitHub

TaskPriority refactor (#2097)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify

* worker remove db

* ShellTask modify

* master persistence processId and appIds

* master persistence processId and appIds

* master add kill task logic

* master add kill task logic

* master add kill task logic

* javadoc error modify

* remove chinese log

* executeDirectly method add Override

* remote module modify

* TaskKillResponseProcessor command type modify

* create buildKillCommand

* host add host:port format

* host add host:port format

* TaskAckProcessor modify

* TaskAckProcessor modify

* task prioriry refator

* remove ITaskQueue

* task prioriry refator

* remove ITaskQueue

* TaskPriority refactor

* remove logs
Co-authored-by: Nqiaozhanwei <qiaozhanwei@analysys.com.cn>
上级 8aaee39a
......@@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import io.swagger.annotations.*;
import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -240,8 +239,7 @@ public class ProcessInstanceController extends BaseController{
logger.info("delete process instance by id, login user:{}, project name:{}, process instance id:{}",
loginUser.getUserName(), projectName, processInstanceId);
// task queue
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
Map<String, Object> result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue);
Map<String, Object> result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId);
return returnDataList(result);
}catch (Exception e){
logger.error(DELETE_PROCESS_INSTANCE_BY_ID_ERROR.getMsg(),e);
......@@ -370,7 +368,6 @@ public class ProcessInstanceController extends BaseController{
logger.info("delete process instance by ids, login user:{}, project name:{}, process instance ids :{}",
loginUser.getUserName(), projectName, processInstanceIds);
// task queue
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
Map<String, Object> result = new HashMap<>(5);
List<String> deleteFailedIdList = new ArrayList<>();
if(StringUtils.isNotEmpty(processInstanceIds)){
......@@ -379,7 +376,7 @@ public class ProcessInstanceController extends BaseController{
for (String strProcessInstanceId:processInstanceIdArray) {
int processInstanceId = Integer.parseInt(strProcessInstanceId);
try {
Map<String, Object> deleteResult = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue);
Map<String, Object> deleteResult = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId);
if(!Status.SUCCESS.equals(deleteResult.get(Constants.STATUS))){
deleteFailedIdList.add(strProcessInstanceId);
logger.error((String)deleteResult.get(Constants.MSG));
......
......@@ -29,8 +29,6 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -318,9 +316,8 @@ public class DataAnalysisService extends BaseService{
return result;
}
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
List<String> tasksQueueList = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
List<String> tasksKillList = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_KILL);
List<String> tasksQueueList = new ArrayList<>();
List<String> tasksKillList = new ArrayList<>();
Map<String,Integer> dataMap = new HashMap<>();
if (loginUser.getUserType() == UserType.ADMIN_USER){
......
......@@ -38,7 +38,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -404,8 +403,6 @@ public class ProcessInstanceService extends BaseDAGService {
processInstance.setProcessInstanceJson(processInstanceJson);
processInstance.setGlobalParams(globalParams);
}
// int update = processDao.updateProcessInstance(processInstanceId, processInstanceJson,
// globalParams, schedule, flag, locations, connects);
int update = processService.updateProcessInstance(processInstance);
int updateDefine = 1;
if (syncDefine && StringUtils.isNotEmpty(processInstanceJson)) {
......@@ -472,11 +469,10 @@ public class ProcessInstanceService extends BaseDAGService {
* @param loginUser login user
* @param projectName project name
* @param processInstanceId process instance id
* @param tasksQueue task queue
* @return delete result code
*/
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId, ITaskQueue tasksQueue) {
public Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId) {
Map<String, Object> result = new HashMap<>(5);
Project project = projectMapper.queryByName(projectName);
......@@ -494,52 +490,6 @@ public class ProcessInstanceService extends BaseDAGService {
return result;
}
//process instance priority
int processInstancePriority = processInstance.getProcessInstancePriority().ordinal();
// delete zk queue
if (CollectionUtils.isNotEmpty(taskInstanceList)){
for (TaskInstance taskInstance : taskInstanceList){
// task instance priority
int taskInstancePriority = taskInstance.getTaskInstancePriority().ordinal();
StringBuilder nodeValueSb = new StringBuilder(100);
nodeValueSb.append(processInstancePriority)
.append(UNDERLINE)
.append(processInstanceId)
.append(UNDERLINE)
.append(taskInstancePriority)
.append(UNDERLINE)
.append(taskInstance.getId())
.append(UNDERLINE);
int taskWorkerGroupId = processService.getTaskWorkerGroupId(taskInstance);
WorkerGroup workerGroup = workerGroupMapper.selectById(taskWorkerGroupId);
if(workerGroup == null){
nodeValueSb.append(DEFAULT_WORKER_ID);
}else {
String ips = workerGroup.getIpList();
StringBuilder ipSb = new StringBuilder(100);
String[] ipArray = ips.split(COMMA);
for (String ip : ipArray) {
long ipLong = IpUtils.ipToLong(ip);
ipSb.append(ipLong).append(COMMA);
}
if(ipSb.length() > 0) {
ipSb.deleteCharAt(ipSb.length() - 1);
}
nodeValueSb.append(ipSb);
}
logger.info("delete task queue node : {}",nodeValueSb.toString());
tasksQueue.removeNode(org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_QUEUE, nodeValueSb.toString());
}
}
// delete database cascade
int delete = processService.deleteWorkProcessInstanceById(processInstanceId);
processService.deleteAllSubWorkProcessByParentId(processInstanceId);
......
......@@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.junit.After;
import org.junit.Assert;
......@@ -74,8 +73,7 @@ public class DataAnalysisServiceTest {
@Mock
TaskInstanceMapper taskInstanceMapper;
@Mock
ITaskQueue taskQueue;
@Mock
ProcessService processService;
......@@ -183,30 +181,6 @@ public class DataAnalysisServiceTest {
}
@Test
public void testCountQueueState(){
PowerMockito.mockStatic(TaskQueueFactory.class);
List<String> taskQueueList = new ArrayList<>(1);
taskQueueList.add("1_0_1_1_-1");
List<String> taskKillList = new ArrayList<>(1);
taskKillList.add("1-0");
PowerMockito.when(taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE)).thenReturn(taskQueueList);
PowerMockito.when(taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_KILL)).thenReturn(taskKillList);
PowerMockito.when(TaskQueueFactory.getTaskQueueInstance()).thenReturn(taskQueue);
//checkProject false
Map<String, Object> result = dataAnalysisService.countQueueState(user,2);
Assert.assertTrue(result.isEmpty());
result = dataAnalysisService.countQueueState(user,1);
Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS));
//admin
user.setUserType(UserType.ADMIN_USER);
result = dataAnalysisService.countQueueState(user,1);
Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS));
}
/**
* get list
* @return
......
......@@ -194,6 +194,7 @@ public class TaskInstance implements Serializable {
/**
* workerGroup
*/
@TableField(exist = false)
private String workerGroup;
public ProcessInstance getProcessInstance() {
......
......@@ -132,7 +132,8 @@ public class TaskPriority {
*/
public static TaskPriority of(String taskPriorityInfo){
String[] parts = taskPriorityInfo.split(UNDERLINE);
if (parts.length != 4) {
if (parts.length != 5) {
throw new IllegalArgumentException(String.format("TaskPriority : %s illegal.", taskPriorityInfo));
}
TaskPriority taskPriority = new TaskPriority(
......
......@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.consumer.TaskUpdateQueueConsumer;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
......@@ -37,6 +38,7 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -166,7 +168,8 @@ public class MasterServer implements IStoppable {
logger.error("start Quartz failed", e);
}
TaskUpdateQueueConsumer taskUpdateQueueConsumer = SpringApplicationContext.getBean(TaskUpdateQueueConsumer.class);
taskUpdateQueueConsumer.start();
/**
* register hooks, which are called before the process exits
*/
......
......@@ -89,7 +89,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
this.cancel = false;
this.taskInstance = taskInstance;
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
this.taskUpdateQueue = new TaskUpdateQueueImpl();
this.taskUpdateQueue = SpringApplicationContext.getBean(TaskUpdateQueueImpl.class);
}
/**
......@@ -180,8 +180,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
processInstance.getId(),
taskInstance.getProcessInstancePriority().getCode(),
taskInstance.getId(),
taskInstance.getWorkerGroup());
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
taskUpdateQueue.put(taskPriorityInfo);
logger.info(String.format("master submit success, task : %s", taskInstance.getName()) );
return true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册