From f112415b13d7fdd18cf668c2cc639e79cdf08458 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Tue, 24 Mar 2020 18:54:45 +0800 Subject: [PATCH] TaskManager refactor (#2302) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * add TaskInstanceCacheManager receive Worker report result * TaskInstance setExecutePath * add TaskInstanceCacheManager to receive Worker Task result report * TaskInstanceCacheManager add remove method * add license * add dispatcht task method * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * 1,worker TaskPros use TaskExecutionContext replase 2,Master kill Task , KillTaskProcessor modify * worker remove db * ShellTask modify * master persistence processId and appIds * master persistence processId and appIds * master add kill task logic * master add kill task logic * master add kill task logic * javadoc error modify * remove chinese log * executeDirectly method add Override * remote module modify * TaskKillResponseProcessor command type modify * create buildKillCommand * host add host:port format * host add host:port format * TaskAckProcessor modify * TaskAckProcessor modify * task prioriry refator * remove ITaskQueue * task prioriry refator * remove ITaskQueue * TaskPriority refactor * remove logs * WorkerServer refactor * MasterSchedulerService modify * WorkerConfig listen port modify * modify master and worker listen port * cancelTaskInstance set TaskExecutionContext host,logPath,executePath * cancelTaskInstance set TaskExecutionContext host,logPath,executePath * Encapsulate the parameters required by sqltask * 1,Encapsulate the parameters required by sqltask 2,SQLTask optimization * AbstractTask modify * ProcedureTask optimization * MasterSchedulerService modify * TaskUpdateQueueConsumer modify * test * DataxTask process run debug * DataxTask process run debug * add protobuf dependency,MR、Spark task etc need this * TaskUpdateQueueConsumer modify * TaskExecutionContextBuilder set TaskInstance workgroup * WorkerGroupService queryAllGroup modify query available work group * 1,get workergroup from zk modify 2,SpringConnectionFactory repeat load modify * master and worker register ip use OSUtils.getHost() * ProcessInstance host set ip:port format * worker fault tolerance modify * Constants and .env modify * master fault tolerant bug modify * UT add pom.xml * timing online modify * when taskResponse is faster than taskAck to db,task state will error add async queue and new a thread reslove this problem * TaskExecutionContext set host * 1,TaskManager refactor 2, api start load server dolphinschedule-daemon.sh modify * 1,TaskManager refactor 2, api start load server dolphinschedule-daemon.sh modify * add UT in pom.xml * revert dolphinscheduler-daemon.sh Co-authored-by: qiaozhanwei --- .../consumer/TaskUpdateQueueConsumer.java | 8 +-- .../server/master/manager/TaskEvent.java | 36 +++++------- .../server/master/manager/TaskEventEnum.java | 58 +++++++++++++++++++ .../server/master/manager/TaskManager.java | 44 ++++++++------ .../master/processor/TaskAckProcessor.java | 6 +- .../processor/TaskResponseProcessor.java | 6 +- pom.xml | 4 ++ 7 files changed, 109 insertions(+), 53 deletions(-) create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java index 5f66c1b28..e7b632717 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.master.consumer; import com.alibaba.fastjson.JSONObject; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UdfType; @@ -89,12 +88,11 @@ public class TaskUpdateQueueConsumer extends Thread{ public void run() { while (Stopper.isRunning()){ try { - if (taskUpdateQueue.size() == 0){ - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - continue; - } + // if not task , blocking here String taskPriorityInfo = taskUpdateQueue.take(); + TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo); + dispatch(taskPriority.getTaskId()); }catch (Exception e){ logger.error("dispatcher task error",e); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java index 5c6740f50..98b8d62d7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java @@ -26,9 +26,6 @@ import java.util.Date; */ public class TaskEvent { - public static final String ACK = "ack"; - public static final String RESPONSE = "response"; - /** * taskInstanceId */ @@ -77,7 +74,7 @@ public class TaskEvent { /** * ack / response */ - private String type; + private TaskEventEnum type; /** @@ -88,22 +85,21 @@ public class TaskEvent { * @param executePath executePath * @param logPath logPath * @param taskInstanceId taskInstanceId - * @param type type */ - public void receiveAck(ExecutionStatus state, - Date startTime, - String workerAddress, - String executePath, - String logPath, - int taskInstanceId, - String type){ + public TaskEvent(ExecutionStatus state, + Date startTime, + String workerAddress, + String executePath, + String logPath, + int taskInstanceId){ this.state = state; this.startTime = startTime; this.workerAddress = workerAddress; this.executePath = executePath; this.logPath = logPath; this.taskInstanceId = taskInstanceId; - this.type = type; + this.type = TaskEventEnum.ACK; + } /** @@ -113,20 +109,18 @@ public class TaskEvent { * @param processId processId * @param appIds appIds * @param taskInstanceId taskInstanceId - * @param type type */ - public void receiveResponse(ExecutionStatus state, + public TaskEvent(ExecutionStatus state, Date endTime, int processId, String appIds, - int taskInstanceId, - String type){ + int taskInstanceId){ this.state = state; this.endTime = endTime; this.processId = processId; this.appIds = appIds; this.taskInstanceId = taskInstanceId; - this.type = type; + this.type = TaskEventEnum.RESPONSE; } public int getTaskInstanceId() { @@ -201,11 +195,11 @@ public class TaskEvent { this.appIds = appIds; } - public String getType() { + public TaskEventEnum getType() { return type; } - public void setType(String type) { + public void setType(TaskEventEnum type) { this.type = type; } @@ -221,7 +215,7 @@ public class TaskEvent { ", logPath='" + logPath + '\'' + ", processId=" + processId + ", appIds='" + appIds + '\'' + - ", type='" + type + '\'' + + ", type=" + type + '}'; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java new file mode 100644 index 000000000..f3d7497e5 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.manager; + +import com.baomidou.mybatisplus.annotation.EnumValue; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; + +import java.util.Date; + +/** + * task event enum + */ +public enum TaskEventEnum { + + ACK(0, "task ack"), + RESPONSE(1, "task response result"); + + TaskEventEnum(int code, String descp){ + this.code = code; + this.descp = descp; + } + + @EnumValue + private final int code; + private final String descp; + + public String getDescp() { + return descp; + } + + public int getCode() { + return code; + } + + public static TaskEventEnum of(int status){ + for(TaskEventEnum es : values()){ + if(es.getCode() == status){ + return es; + } + } + throw new IllegalArgumentException("invalid status : " + status); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java index a2710ee48..ade0aea44 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.master.manager; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; @@ -28,6 +27,7 @@ import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import static org.apache.dolphinscheduler.server.master.manager.TaskEventEnum.*; /** * task manager @@ -56,6 +56,7 @@ public class TaskManager { @PostConstruct public void init(){ TaskWorker taskWorker = new TaskWorker(); + taskWorker.setName("TaskWorkerThread"); taskWorker.start(); } @@ -83,12 +84,8 @@ public class TaskManager { while (Stopper.isRunning()){ try { - if (attemptQueue.size() == 0){ - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - continue; - } + // if not task , blocking here TaskEvent taskEvent = attemptQueue.take(); - persist(taskEvent); }catch (Exception e){ @@ -102,19 +99,28 @@ public class TaskManager { * @param taskEvent taskEvent */ private void persist(TaskEvent taskEvent){ - if (TaskEvent.ACK.equals(taskEvent.getType())){ - processService.changeTaskState(taskEvent.getState(), - taskEvent.getStartTime(), - taskEvent.getWorkerAddress(), - taskEvent.getExecutePath(), - taskEvent.getLogPath(), - taskEvent.getTaskInstanceId()); - }else if (TaskEvent.RESPONSE.equals(taskEvent.getType())){ - processService.changeTaskState(taskEvent.getState(), - taskEvent.getEndTime(), - taskEvent.getProcessId(), - taskEvent.getAppIds(), - taskEvent.getTaskInstanceId()); + // task event type + TaskEventEnum type = taskEvent.getType(); + + switch (type){ + case ACK: + processService.changeTaskState(taskEvent.getState(), + taskEvent.getStartTime(), + taskEvent.getWorkerAddress(), + taskEvent.getExecutePath(), + taskEvent.getLogPath(), + taskEvent.getTaskInstanceId()); + break; + case RESPONSE: + processService.changeTaskState(taskEvent.getState(), + taskEvent.getEndTime(), + taskEvent.getProcessId(), + taskEvent.getAppIds(), + taskEvent.getTaskInstanceId()); + break; + default: + throw new IllegalArgumentException("invalid task event type : " + type); + } } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index a678caddf..67832a1b3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -73,14 +73,12 @@ public class TaskAckProcessor implements NettyRequestProcessor { String workerAddress = ChannelUtils.toAddress(channel).getAddress(); // TaskEvent - TaskEvent taskEvent = new TaskEvent(); - taskEvent.receiveAck(ExecutionStatus.of(taskAckCommand.getStatus()), + TaskEvent taskEvent = new TaskEvent(ExecutionStatus.of(taskAckCommand.getStatus()), taskAckCommand.getStartTime(), workerAddress, taskAckCommand.getExecutePath(), taskAckCommand.getLogPath(), - taskAckCommand.getTaskInstanceId(), - TaskEvent.ACK); + taskAckCommand.getTaskInstanceId()); taskManager.putTask(taskEvent); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index ffc5d7293..9cf10b985 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -73,13 +73,11 @@ public class TaskResponseProcessor implements NettyRequestProcessor { taskInstanceCacheManager.cacheTaskInstance(responseCommand); // TaskEvent - TaskEvent taskEvent = new TaskEvent(); - taskEvent.receiveResponse(ExecutionStatus.of(responseCommand.getStatus()), + TaskEvent taskEvent = new TaskEvent(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), responseCommand.getProcessId(), responseCommand.getAppIds(), - responseCommand.getTaskInstanceId(), - TaskEvent.RESPONSE); + responseCommand.getTaskInstanceId()); taskManager.putTask(taskEvent); } diff --git a/pom.xml b/pom.xml index e2b0e6690..e7b95d9cf 100644 --- a/pom.xml +++ b/pom.xml @@ -732,6 +732,8 @@ **/server/log/WorkerLogFilterTest.java **/server/master/executor/NettyExecutorManagerTest.java **/server/master/host/LowerWeightRoundRobinTest.java + **/server/master/host/RandomSelectorTest.java + **/server/master/host/RoundRobinSelectorTest.java **/server/master/register/MasterRegistryTest.java **/server/master/AlertManagerTest.java **/server/master/MasterCommandTest.java @@ -743,6 +745,7 @@ **/server/utils/ParamUtilsTest.java **/server/utils/ProcessUtilsTest.java **/server/utils/SparkArgsUtilsTest.java + **/server/worker/processor/TaskCallbackServiceTest.java **/server/worker/register/WorkerRegistryTest.java **/server/worker/shell/ShellCommandExecutorTest.java **/server/worker/sql/SqlExecutorTest.java @@ -750,6 +753,7 @@ **/server/worker/task/dependent/DependentTaskTest.java **/server/worker/task/spark/SparkTaskTest.java **/server/worker/task/EnvFileTest.java + -Xmx2048m -- GitLab