未验证 提交 f112415b 编写于 作者: Q qiaozhanwei 提交者: GitHub

TaskManager refactor (#2302)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify

* worker remove db

* ShellTask modify

* master persistence processId and appIds

* master persistence processId and appIds

* master add kill task logic

* master add kill task logic

* master add kill task logic

* javadoc error modify

* remove chinese log

* executeDirectly method add Override

* remote module modify

* TaskKillResponseProcessor command type modify

* create buildKillCommand

* host add host:port format

* host add host:port format

* TaskAckProcessor modify

* TaskAckProcessor modify

* task prioriry refator

* remove ITaskQueue

* task prioriry refator

* remove ITaskQueue

* TaskPriority refactor

* remove logs

* WorkerServer refactor

* MasterSchedulerService modify

* WorkerConfig listen port modify

* modify master and worker listen port

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* Encapsulate the parameters required by sqltask

* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization

* AbstractTask modify

* ProcedureTask optimization

* MasterSchedulerService modify

* TaskUpdateQueueConsumer modify

* test

* DataxTask process run debug

* DataxTask process run debug

* add protobuf dependency,MR、Spark task etc need this

* TaskUpdateQueueConsumer modify

* TaskExecutionContextBuilder set TaskInstance workgroup

* WorkerGroupService queryAllGroup modify
query available work group

* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify

* master and worker register ip  use OSUtils.getHost()

* ProcessInstance host set ip:port format

* worker fault tolerance modify

* Constants and .env modify

* master fault tolerant bug modify

* UT add pom.xml

* timing online  modify

* when taskResponse is faster than taskAck to db,task state will error
add async queue and new a thread reslove this problem

* TaskExecutionContext set host

* 1,TaskManager refactor
2, api start load server dolphinschedule-daemon.sh modify

* 1,TaskManager refactor
2, api start load server dolphinschedule-daemon.sh modify

* add UT in pom.xml

* revert dolphinscheduler-daemon.sh
Co-authored-by: Nqiaozhanwei <qiaozhanwei@analysys.com.cn>
上级 62f7d21b
......@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.consumer;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType;
......@@ -89,12 +88,11 @@ public class TaskUpdateQueueConsumer extends Thread{
public void run() {
while (Stopper.isRunning()){
try {
if (taskUpdateQueue.size() == 0){
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
// if not task , blocking here
String taskPriorityInfo = taskUpdateQueue.take();
TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
dispatch(taskPriority.getTaskId());
}catch (Exception e){
logger.error("dispatcher task error",e);
......
......@@ -26,9 +26,6 @@ import java.util.Date;
*/
public class TaskEvent {
public static final String ACK = "ack";
public static final String RESPONSE = "response";
/**
* taskInstanceId
*/
......@@ -77,7 +74,7 @@ public class TaskEvent {
/**
* ack / response
*/
private String type;
private TaskEventEnum type;
/**
......@@ -88,22 +85,21 @@ public class TaskEvent {
* @param executePath executePath
* @param logPath logPath
* @param taskInstanceId taskInstanceId
* @param type type
*/
public void receiveAck(ExecutionStatus state,
Date startTime,
String workerAddress,
String executePath,
String logPath,
int taskInstanceId,
String type){
public TaskEvent(ExecutionStatus state,
Date startTime,
String workerAddress,
String executePath,
String logPath,
int taskInstanceId){
this.state = state;
this.startTime = startTime;
this.workerAddress = workerAddress;
this.executePath = executePath;
this.logPath = logPath;
this.taskInstanceId = taskInstanceId;
this.type = type;
this.type = TaskEventEnum.ACK;
}
/**
......@@ -113,20 +109,18 @@ public class TaskEvent {
* @param processId processId
* @param appIds appIds
* @param taskInstanceId taskInstanceId
* @param type type
*/
public void receiveResponse(ExecutionStatus state,
public TaskEvent(ExecutionStatus state,
Date endTime,
int processId,
String appIds,
int taskInstanceId,
String type){
int taskInstanceId){
this.state = state;
this.endTime = endTime;
this.processId = processId;
this.appIds = appIds;
this.taskInstanceId = taskInstanceId;
this.type = type;
this.type = TaskEventEnum.RESPONSE;
}
public int getTaskInstanceId() {
......@@ -201,11 +195,11 @@ public class TaskEvent {
this.appIds = appIds;
}
public String getType() {
public TaskEventEnum getType() {
return type;
}
public void setType(String type) {
public void setType(TaskEventEnum type) {
this.type = type;
}
......@@ -221,7 +215,7 @@ public class TaskEvent {
", logPath='" + logPath + '\'' +
", processId=" + processId +
", appIds='" + appIds + '\'' +
", type='" + type + '\'' +
", type=" + type +
'}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.manager;
import com.baomidou.mybatisplus.annotation.EnumValue;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import java.util.Date;
/**
* task event enum
*/
public enum TaskEventEnum {
ACK(0, "task ack"),
RESPONSE(1, "task response result");
TaskEventEnum(int code, String descp){
this.code = code;
this.descp = descp;
}
@EnumValue
private final int code;
private final String descp;
public String getDescp() {
return descp;
}
public int getCode() {
return code;
}
public static TaskEventEnum of(int status){
for(TaskEventEnum es : values()){
if(es.getCode() == status){
return es;
}
}
throw new IllegalArgumentException("invalid status : " + status);
}
}
......@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.manager;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
......@@ -28,6 +27,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.dolphinscheduler.server.master.manager.TaskEventEnum.*;
/**
* task manager
......@@ -56,6 +56,7 @@ public class TaskManager {
@PostConstruct
public void init(){
TaskWorker taskWorker = new TaskWorker();
taskWorker.setName("TaskWorkerThread");
taskWorker.start();
}
......@@ -83,12 +84,8 @@ public class TaskManager {
while (Stopper.isRunning()){
try {
if (attemptQueue.size() == 0){
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
// if not task , blocking here
TaskEvent taskEvent = attemptQueue.take();
persist(taskEvent);
}catch (Exception e){
......@@ -102,19 +99,28 @@ public class TaskManager {
* @param taskEvent taskEvent
*/
private void persist(TaskEvent taskEvent){
if (TaskEvent.ACK.equals(taskEvent.getType())){
processService.changeTaskState(taskEvent.getState(),
taskEvent.getStartTime(),
taskEvent.getWorkerAddress(),
taskEvent.getExecutePath(),
taskEvent.getLogPath(),
taskEvent.getTaskInstanceId());
}else if (TaskEvent.RESPONSE.equals(taskEvent.getType())){
processService.changeTaskState(taskEvent.getState(),
taskEvent.getEndTime(),
taskEvent.getProcessId(),
taskEvent.getAppIds(),
taskEvent.getTaskInstanceId());
// task event type
TaskEventEnum type = taskEvent.getType();
switch (type){
case ACK:
processService.changeTaskState(taskEvent.getState(),
taskEvent.getStartTime(),
taskEvent.getWorkerAddress(),
taskEvent.getExecutePath(),
taskEvent.getLogPath(),
taskEvent.getTaskInstanceId());
break;
case RESPONSE:
processService.changeTaskState(taskEvent.getState(),
taskEvent.getEndTime(),
taskEvent.getProcessId(),
taskEvent.getAppIds(),
taskEvent.getTaskInstanceId());
break;
default:
throw new IllegalArgumentException("invalid task event type : " + type);
}
}
}
......
......@@ -73,14 +73,12 @@ public class TaskAckProcessor implements NettyRequestProcessor {
String workerAddress = ChannelUtils.toAddress(channel).getAddress();
// TaskEvent
TaskEvent taskEvent = new TaskEvent();
taskEvent.receiveAck(ExecutionStatus.of(taskAckCommand.getStatus()),
TaskEvent taskEvent = new TaskEvent(ExecutionStatus.of(taskAckCommand.getStatus()),
taskAckCommand.getStartTime(),
workerAddress,
taskAckCommand.getExecutePath(),
taskAckCommand.getLogPath(),
taskAckCommand.getTaskInstanceId(),
TaskEvent.ACK);
taskAckCommand.getTaskInstanceId());
taskManager.putTask(taskEvent);
......
......@@ -73,13 +73,11 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
taskInstanceCacheManager.cacheTaskInstance(responseCommand);
// TaskEvent
TaskEvent taskEvent = new TaskEvent();
taskEvent.receiveResponse(ExecutionStatus.of(responseCommand.getStatus()),
TaskEvent taskEvent = new TaskEvent(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(),
responseCommand.getProcessId(),
responseCommand.getAppIds(),
responseCommand.getTaskInstanceId(),
TaskEvent.RESPONSE);
responseCommand.getTaskInstanceId());
taskManager.putTask(taskEvent);
}
......
......@@ -732,6 +732,8 @@
<include>**/server/log/WorkerLogFilterTest.java</include>
<include>**/server/master/executor/NettyExecutorManagerTest.java</include>
<include>**/server/master/host/LowerWeightRoundRobinTest.java</include>
<include>**/server/master/host/RandomSelectorTest.java</include>
<include>**/server/master/host/RoundRobinSelectorTest.java</include>
<include>**/server/master/register/MasterRegistryTest.java</include>
<include>**/server/master/AlertManagerTest.java</include>
<include>**/server/master/MasterCommandTest.java</include>
......@@ -743,6 +745,7 @@
<include>**/server/utils/ParamUtilsTest.java</include>
<include>**/server/utils/ProcessUtilsTest.java</include>
<include>**/server/utils/SparkArgsUtilsTest.java</include>
<include>**/server/worker/processor/TaskCallbackServiceTest.java</include>
<include>**/server/worker/register/WorkerRegistryTest.java</include>
<include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
<include>**/server/worker/sql/SqlExecutorTest.java</include>
......@@ -750,6 +753,7 @@
<include>**/server/worker/task/dependent/DependentTaskTest.java</include>
<include>**/server/worker/task/spark/SparkTaskTest.java</include>
<include>**/server/worker/task/EnvFileTest.java</include>
</includes>
<!-- <skip>true</skip> -->
<argLine>-Xmx2048m</argLine>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册