From 5730bfe2bede516d8879533ec6bf6a619415dcdb Mon Sep 17 00:00:00 2001 From: Tboy Date: Fri, 27 Mar 2020 09:43:45 +0800 Subject: [PATCH] Refactor worker (#2319) * let quartz use the same datasource * move master/worker config from dao.properties to each config add master/worker registry test * move mybatis config from application.properties to SpringConnectionFactory * move mybatis-plus config from application.properties to SpringConnectionFactory * refactor TaskCallbackService * add ZookeeperNodeManagerTest * add NettyExecutorManagerTest * refactor TaskKillProcessor * add RandomSelectorTest, RoundRobinSelectorTest, TaskCallbackServiceTest * add RoundRobinHostManagerTest, ExecutorDispatcherTest * refactor task response service --- .../server/master/manager/TaskEventEnum.java | 58 ----------- .../master/processor/TaskAckProcessor.java | 15 ++- .../processor/TaskResponseProcessor.java | 15 ++- .../queue/TaskResponseEvent.java} | 99 ++++++------------- .../queue/TaskResponseService.java} | 59 ++++++----- .../server/registry/DependencyConfig.java | 6 +- 6 files changed, 77 insertions(+), 175 deletions(-) delete mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java rename dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/{manager/TaskEvent.java => processor/queue/TaskResponseEvent.java} (57%) rename dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/{manager/TaskManager.java => processor/queue/TaskResponseService.java} (59%) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java deleted file mode 100644 index f3d7497e5..000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.manager; - -import com.baomidou.mybatisplus.annotation.EnumValue; -import org.apache.dolphinscheduler.common.enums.ExecutionStatus; - -import java.util.Date; - -/** - * task event enum - */ -public enum TaskEventEnum { - - ACK(0, "task ack"), - RESPONSE(1, "task response result"); - - TaskEventEnum(int code, String descp){ - this.code = code; - this.descp = descp; - } - - @EnumValue - private final int code; - private final String descp; - - public String getDescp() { - return descp; - } - - public int getCode() { - return code; - } - - public static TaskEventEnum of(int status){ - for(TaskEventEnum es : values()){ - if(es.getCode() == status){ - return es; - } - } - throw new IllegalArgumentException("invalid status : " + status); - } -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index 67832a1b3..1eb40db15 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -28,10 +28,9 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; -import org.apache.dolphinscheduler.server.master.manager.TaskEvent; -import org.apache.dolphinscheduler.server.master.manager.TaskManager; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +44,7 @@ public class TaskAckProcessor implements NettyRequestProcessor { /** * process service */ - private final TaskManager taskManager; + private final TaskResponseService taskResponseService; /** * taskInstance cache manager @@ -53,7 +52,7 @@ public class TaskAckProcessor implements NettyRequestProcessor { private final TaskInstanceCacheManager taskInstanceCacheManager; public TaskAckProcessor(){ - this.taskManager = SpringApplicationContext.getBean(TaskManager.class); + this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); } @@ -72,15 +71,15 @@ public class TaskAckProcessor implements NettyRequestProcessor { String workerAddress = ChannelUtils.toAddress(channel).getAddress(); - // TaskEvent - TaskEvent taskEvent = new TaskEvent(ExecutionStatus.of(taskAckCommand.getStatus()), + // TaskResponseEvent + TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.of(taskAckCommand.getStatus()), taskAckCommand.getStartTime(), workerAddress, taskAckCommand.getExecutePath(), taskAckCommand.getLogPath(), taskAckCommand.getTaskInstanceId()); - taskManager.putTask(taskEvent); + taskResponseService.addResponse(taskResponseEvent); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 9cf10b985..36b382313 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -27,10 +27,9 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; -import org.apache.dolphinscheduler.server.master.manager.TaskEvent; -import org.apache.dolphinscheduler.server.master.manager.TaskManager; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +43,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor { /** * process service */ - private final TaskManager taskManager; + private final TaskResponseService taskResponseService; /** * taskInstance cache manager @@ -52,7 +51,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor { private final TaskInstanceCacheManager taskInstanceCacheManager; public TaskResponseProcessor(){ - this.taskManager = SpringApplicationContext.getBean(TaskManager.class); + this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); } @@ -72,14 +71,14 @@ public class TaskResponseProcessor implements NettyRequestProcessor { taskInstanceCacheManager.cacheTaskInstance(responseCommand); - // TaskEvent - TaskEvent taskEvent = new TaskEvent(ExecutionStatus.of(responseCommand.getStatus()), + // TaskResponseEvent + TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), responseCommand.getProcessId(), responseCommand.getAppIds(), responseCommand.getTaskInstanceId()); - taskManager.putTask(taskEvent); + taskResponseService.addResponse(taskResponseEvent); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java similarity index 57% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java index 98b8d62d7..9e8813fd7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.master.manager; +package org.apache.dolphinscheduler.server.master.processor.queue; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; @@ -24,7 +24,7 @@ import java.util.Date; /** * task event */ -public class TaskEvent { +public class TaskResponseEvent { /** * taskInstanceId @@ -74,53 +74,29 @@ public class TaskEvent { /** * ack / response */ - private TaskEventEnum type; - - - /** - * receive ack info - * @param state state - * @param startTime startTime - * @param workerAddress workerAddress - * @param executePath executePath - * @param logPath logPath - * @param taskInstanceId taskInstanceId - */ - public TaskEvent(ExecutionStatus state, - Date startTime, - String workerAddress, - String executePath, - String logPath, - int taskInstanceId){ - this.state = state; - this.startTime = startTime; - this.workerAddress = workerAddress; - this.executePath = executePath; - this.logPath = logPath; - this.taskInstanceId = taskInstanceId; - this.type = TaskEventEnum.ACK; - - } - - /** - * receive response info - * @param state state - * @param endTime endTime - * @param processId processId - * @param appIds appIds - * @param taskInstanceId taskInstanceId - */ - public TaskEvent(ExecutionStatus state, - Date endTime, - int processId, - String appIds, - int taskInstanceId){ - this.state = state; - this.endTime = endTime; - this.processId = processId; - this.appIds = appIds; - this.taskInstanceId = taskInstanceId; - this.type = TaskEventEnum.RESPONSE; + private Event event; + + public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, String workerAddress, String executePath, String logPath, int taskInstanceId){ + TaskResponseEvent event = new TaskResponseEvent(); + event.setState(state); + event.setStartTime(startTime); + event.setWorkerAddress(workerAddress); + event.setExecutePath(executePath); + event.setLogPath(logPath); + event.setTaskInstanceId(taskInstanceId); + event.setEvent(Event.ACK); + return event; + } + + public static TaskResponseEvent newResult(ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstanceId){ + TaskResponseEvent event = new TaskResponseEvent(); + event.setState(state); + event.setEndTime(endTime); + event.setProcessId(processId); + event.setAppIds(appIds); + event.setTaskInstanceId(taskInstanceId); + event.setEvent(Event.RESULT); + return event; } public int getTaskInstanceId() { @@ -195,27 +171,16 @@ public class TaskEvent { this.appIds = appIds; } - public TaskEventEnum getType() { - return type; + public Event getEvent() { + return event; } - public void setType(TaskEventEnum type) { - this.type = type; + public void setEvent(Event event) { + this.event = event; } - @Override - public String toString() { - return "TaskEvent{" + - "taskInstanceId=" + taskInstanceId + - ", workerAddress='" + workerAddress + '\'' + - ", state=" + state + - ", startTime=" + startTime + - ", endTime=" + endTime + - ", executePath='" + executePath + '\'' + - ", logPath='" + logPath + '\'' + - ", processId=" + processId + - ", appIds='" + appIds + '\'' + - ", type=" + type + - '}'; + public enum Event{ + ACK, + RESULT; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java similarity index 59% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index ade0aea44..c471c9c78 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.master.manager; +package org.apache.dolphinscheduler.server.master.processor.queue; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -27,23 +27,22 @@ import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import static org.apache.dolphinscheduler.server.master.manager.TaskEventEnum.*; /** * task manager */ @Component -public class TaskManager { +public class TaskResponseService { /** * logger */ - private static final Logger logger = LoggerFactory.getLogger(TaskManager.class); + private static final Logger logger = LoggerFactory.getLogger(TaskResponseService.class); /** * attemptQueue */ - private final BlockingQueue attemptQueue = new LinkedBlockingQueue<>(5000); + private final BlockingQueue attemptQueue = new LinkedBlockingQueue<>(5000); /** @@ -63,13 +62,13 @@ public class TaskManager { /** * put task to attemptQueue * - * @param taskEvent taskEvent + * @param taskResponseEvent taskResponseEvent */ - public void putTask(TaskEvent taskEvent){ + public void addResponse(TaskResponseEvent taskResponseEvent){ try { - attemptQueue.put(taskEvent); + attemptQueue.put(taskResponseEvent); } catch (InterruptedException e) { - logger.error("put task : {} error :{}",taskEvent,e); + logger.error("put task : {} error :{}", taskResponseEvent,e); } } @@ -85,8 +84,8 @@ public class TaskManager { while (Stopper.isRunning()){ try { // if not task , blocking here - TaskEvent taskEvent = attemptQueue.take(); - persist(taskEvent); + TaskResponseEvent taskResponseEvent = attemptQueue.take(); + persist(taskResponseEvent); }catch (Exception e){ logger.error("persist task error",e); @@ -95,32 +94,30 @@ public class TaskManager { } /** - * persist taskEvent - * @param taskEvent taskEvent + * persist taskResponseEvent + * @param taskResponseEvent taskResponseEvent */ - private void persist(TaskEvent taskEvent){ - // task event type - TaskEventEnum type = taskEvent.getType(); + private void persist(TaskResponseEvent taskResponseEvent){ + TaskResponseEvent.Event event = taskResponseEvent.getEvent(); - switch (type){ + switch (event){ case ACK: - processService.changeTaskState(taskEvent.getState(), - taskEvent.getStartTime(), - taskEvent.getWorkerAddress(), - taskEvent.getExecutePath(), - taskEvent.getLogPath(), - taskEvent.getTaskInstanceId()); + processService.changeTaskState(taskResponseEvent.getState(), + taskResponseEvent.getStartTime(), + taskResponseEvent.getWorkerAddress(), + taskResponseEvent.getExecutePath(), + taskResponseEvent.getLogPath(), + taskResponseEvent.getTaskInstanceId()); break; - case RESPONSE: - processService.changeTaskState(taskEvent.getState(), - taskEvent.getEndTime(), - taskEvent.getProcessId(), - taskEvent.getAppIds(), - taskEvent.getTaskInstanceId()); + case RESULT: + processService.changeTaskState(taskResponseEvent.getState(), + taskResponseEvent.getEndTime(), + taskResponseEvent.getProcessId(), + taskResponseEvent.getAppIds(), + taskResponseEvent.getTaskInstanceId()); break; default: - throw new IllegalArgumentException("invalid task event type : " + type); - + throw new IllegalArgumentException("invalid event type : " + event); } } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java index 2d6e3ecbc..0adea44cf 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java @@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager; import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager; -import org.apache.dolphinscheduler.server.master.manager.TaskManager; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.service.process.ProcessService; import org.mockito.Mockito; @@ -141,7 +141,7 @@ public class DependencyConfig { } @Bean - public TaskManager taskManager(){ - return Mockito.mock(TaskManager.class); + public TaskResponseService taskResponseService(){ + return Mockito.mock(TaskResponseService.class); } } -- GitLab