diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java index 4984390527cc92bf14d6d88c37bf514d4694b7e6..924dd131f8095ec3d99a37fe27828aef906637b4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java @@ -17,24 +17,14 @@ package org.apache.dolphinscheduler.server.master.processor.queue; -import org.apache.dolphinscheduler.common.enums.Event; -import org.apache.dolphinscheduler.common.enums.StateEvent; -import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; -import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand; -import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand; -import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; -import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator; -import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -44,8 +34,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import io.netty.channel.Channel; - /** * task manager */ @@ -62,46 +50,39 @@ public class TaskEventService { */ private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(); - /** - * process service - */ - @Autowired - private ProcessService processService; - - /** - * data quality result operator - */ - @Autowired - private DataQualityResultOperator dataQualityResultOperator; - /** * task event worker */ - private Thread taskEventWorker; + private Thread taskEventThread; - @Autowired - private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + private Thread taskEventHandlerThread; @Autowired - private WorkflowExecuteThreadPool workflowExecuteThreadPool; + private TaskExecuteThreadPool taskExecuteThreadPool; @PostConstruct public void start() { - this.taskEventWorker = new TaskEventWorker(); - this.taskEventWorker.setName("TaskStateEventWorker"); - this.taskEventWorker.start(); + this.taskEventThread = new TaskEventThread(); + this.taskEventThread.setName("TaskEventThread"); + this.taskEventThread.start(); + + this.taskEventHandlerThread = new TaskEventHandlerThread(); + this.taskEventHandlerThread.setName("TaskEventHandlerThread"); + this.taskEventHandlerThread.start(); } @PreDestroy public void stop() { try { - this.taskEventWorker.interrupt(); + this.taskEventThread.interrupt(); + this.taskEventHandlerThread.interrupt(); if (!eventQueue.isEmpty()) { List remainEvents = new ArrayList<>(eventQueue.size()); eventQueue.drainTo(remainEvents); - for (TaskEvent event : remainEvents) { - this.persist(event); + for (TaskEvent taskEvent : remainEvents) { + taskExecuteThreadPool.submitTaskEvent(taskEvent); } + taskExecuteThreadPool.eventHandler(); } } catch (Exception e) { logger.error("stop error:", e); @@ -109,32 +90,25 @@ public class TaskEventService { } /** - * add event to queue + * add event * * @param taskEvent taskEvent */ public void addEvent(TaskEvent taskEvent) { - try { - eventQueue.put(taskEvent); - } catch (InterruptedException e) { - logger.error("add task event : {} error :{}", taskEvent, e); - Thread.currentThread().interrupt(); - } + taskExecuteThreadPool.submitTaskEvent(taskEvent); } /** * task worker thread */ - class TaskEventWorker extends Thread { - + class TaskEventThread extends Thread { @Override public void run() { - while (Stopper.isRunning()) { try { // if not task , blocking here TaskEvent taskEvent = eventQueue.take(); - persist(taskEvent); + taskExecuteThreadPool.submitTaskEvent(taskEvent); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; @@ -147,121 +121,24 @@ public class TaskEventService { } /** - * persist task event - * - * @param taskEvent taskEvent - */ - private void persist(TaskEvent taskEvent) { - Event event = taskEvent.getEvent(); - int taskInstanceId = taskEvent.getTaskInstanceId(); - int processInstanceId = taskEvent.getProcessInstanceId(); - - TaskInstance taskInstance; - WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); - if (workflowExecuteThread != null && workflowExecuteThread.checkTaskInstanceById(taskInstanceId)) { - taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId); - } else { - taskInstance = processService.findTaskInstanceById(taskInstanceId); - } - - switch (event) { - case DISPATCH: - handleDispatchEvent(taskEvent, taskInstance); - // dispatch event do not need to submit state event - return; - case DELAY: - case RUNNING: - handleRunningEvent(taskEvent, taskInstance); - break; - case RESULT: - handleResultEvent(taskEvent, taskInstance); - break; - default: - throw new IllegalArgumentException("invalid event type : " + event); - } - - StateEvent stateEvent = new StateEvent(); - stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId()); - stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId()); - stateEvent.setExecutionStatus(taskEvent.getState()); - stateEvent.setType(StateEventType.TASK_STATE_CHANGE); - workflowExecuteThreadPool.submitStateEvent(stateEvent); - } - - /** - * handle dispatch event + * event handler thread */ - private void handleDispatchEvent(TaskEvent taskEvent, TaskInstance taskInstance) { - if (taskInstance == null) { - logger.error("taskInstance is null"); - return; - } - if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) { - return; - } - taskInstance.setState(ExecutionStatus.DISPATCH); - taskInstance.setHost(taskEvent.getWorkerAddress()); - processService.saveTaskInstance(taskInstance); - } + class TaskEventHandlerThread extends Thread { - /** - * handle running event - */ - private void handleRunningEvent(TaskEvent taskEvent, TaskInstance taskInstance) { - Channel channel = taskEvent.getChannel(); - try { - if (taskInstance != null) { - if (taskInstance.getState().typeIsFinished()) { - logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState()); - } else { - taskInstance.setState(taskEvent.getState()); - taskInstance.setStartTime(taskEvent.getStartTime()); - taskInstance.setHost(taskEvent.getWorkerAddress()); - taskInstance.setLogPath(taskEvent.getLogPath()); - taskInstance.setExecutePath(taskEvent.getExecutePath()); - taskInstance.setPid(taskEvent.getProcessId()); - taskInstance.setAppLink(taskEvent.getAppIds()); - processService.saveTaskInstance(taskInstance); + @Override + public void run() { + logger.info("event handler thread started"); + while (Stopper.isRunning()) { + try { + taskExecuteThreadPool.eventHandler(); + TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } catch (Exception e) { + logger.error("event handler thread error", e); } } - // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success - TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); - channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); - } catch (Exception e) { - logger.error("worker ack master error", e); - TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1); - channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); - } - } - - /** - * handle result event - */ - private void handleResultEvent(TaskEvent taskEvent, TaskInstance taskInstance) { - Channel channel = taskEvent.getChannel(); - try { - if (taskInstance != null) { - dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance); - - taskInstance.setStartTime(taskEvent.getStartTime()); - taskInstance.setHost(taskEvent.getWorkerAddress()); - taskInstance.setLogPath(taskEvent.getLogPath()); - taskInstance.setExecutePath(taskEvent.getExecutePath()); - taskInstance.setPid(taskEvent.getProcessId()); - taskInstance.setAppLink(taskEvent.getAppIds()); - taskInstance.setState(taskEvent.getState()); - taskInstance.setEndTime(taskEvent.getEndTime()); - taskInstance.setVarPool(taskEvent.getVarPool()); - processService.changeOutParam(taskInstance); - processService.saveTaskInstance(taskInstance); - } - // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success - TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); - channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command()); - } catch (Exception e) { - logger.error("worker response master error", e); - TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1); - channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command()); } } } \ No newline at end of file diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java new file mode 100644 index 0000000000000000000000000000000000000000..47b190e24647df6cf2358fc1b80253e99e5254f5 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.processor.queue; + +import org.apache.dolphinscheduler.common.enums.Event; +import org.apache.dolphinscheduler.common.enums.StateEvent; +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; +import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.Channel; + +/** + * task execute thread + */ +public class TaskExecuteThread { + + private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class); + + private final int processInstanceId; + + private final ConcurrentLinkedQueue events = new ConcurrentLinkedQueue<>(); + + private ProcessService processService; + + private WorkflowExecuteThreadPool workflowExecuteThreadPool; + + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + + private DataQualityResultOperator dataQualityResultOperator; + + public TaskExecuteThread(int processInstanceId, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool, + ProcessInstanceExecCacheManager processInstanceExecCacheManager, DataQualityResultOperator dataQualityResultOperator) { + this.processInstanceId = processInstanceId; + this.processService = processService; + this.workflowExecuteThreadPool = workflowExecuteThreadPool; + this.processInstanceExecCacheManager = processInstanceExecCacheManager; + this.dataQualityResultOperator = dataQualityResultOperator; + } + + public void run() { + while (!this.events.isEmpty()) { + TaskEvent event = this.events.peek(); + try { + persist(event); + } catch (Exception e) { + logger.error("persist error, event:{}, error: {}", event, e); + } finally { + this.events.remove(event); + } + } + } + + public String getKey() { + return String.valueOf(processInstanceId); + } + + public int eventSize() { + return this.events.size(); + } + + public boolean isEmpty() { + return this.events.isEmpty(); + } + + public Integer getProcessInstanceId() { + return processInstanceId; + } + + public boolean addEvent(TaskEvent event) { + if (event.getProcessInstanceId() != this.processInstanceId) { + logger.warn("event would be abounded, task instance id:{}, process instance id:{}, this.processInstanceId:{}", + event.getTaskInstanceId(), event.getProcessInstanceId(), this.processInstanceId); + return false; + } + return this.events.add(event); + } + + /** + * persist task event + * + * @param taskEvent taskEvent + */ + private void persist(TaskEvent taskEvent) { + Event event = taskEvent.getEvent(); + int taskInstanceId = taskEvent.getTaskInstanceId(); + int processInstanceId = taskEvent.getProcessInstanceId(); + + TaskInstance taskInstance; + WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteThread != null && workflowExecuteThread.checkTaskInstanceById(taskInstanceId)) { + taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId); + } else { + taskInstance = processService.findTaskInstanceById(taskInstanceId); + } + + switch (event) { + case DISPATCH: + handleDispatchEvent(taskEvent, taskInstance); + // dispatch event do not need to submit state event + return; + case DELAY: + case RUNNING: + handleRunningEvent(taskEvent, taskInstance); + break; + case RESULT: + handleResultEvent(taskEvent, taskInstance); + break; + default: + throw new IllegalArgumentException("invalid event type : " + event); + } + + StateEvent stateEvent = new StateEvent(); + stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId()); + stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId()); + stateEvent.setExecutionStatus(taskEvent.getState()); + stateEvent.setType(StateEventType.TASK_STATE_CHANGE); + workflowExecuteThreadPool.submitStateEvent(stateEvent); + } + + /** + * handle dispatch event + */ + private void handleDispatchEvent(TaskEvent taskEvent, TaskInstance taskInstance) { + if (taskInstance == null) { + logger.error("taskInstance is null"); + return; + } + if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) { + return; + } + taskInstance.setState(ExecutionStatus.DISPATCH); + taskInstance.setHost(taskEvent.getWorkerAddress()); + processService.saveTaskInstance(taskInstance); + } + + /** + * handle running event + */ + private void handleRunningEvent(TaskEvent taskEvent, TaskInstance taskInstance) { + Channel channel = taskEvent.getChannel(); + try { + if (taskInstance != null) { + if (taskInstance.getState().typeIsFinished()) { + logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState()); + } else { + taskInstance.setState(taskEvent.getState()); + taskInstance.setStartTime(taskEvent.getStartTime()); + taskInstance.setHost(taskEvent.getWorkerAddress()); + taskInstance.setLogPath(taskEvent.getLogPath()); + taskInstance.setExecutePath(taskEvent.getExecutePath()); + taskInstance.setPid(taskEvent.getProcessId()); + taskInstance.setAppLink(taskEvent.getAppIds()); + processService.saveTaskInstance(taskInstance); + } + } + // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success + TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); + channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); + } catch (Exception e) { + logger.error("worker ack master error", e); + TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1); + channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); + } + } + + /** + * handle result event + */ + private void handleResultEvent(TaskEvent taskEvent, TaskInstance taskInstance) { + Channel channel = taskEvent.getChannel(); + try { + if (taskInstance != null) { + dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance); + + taskInstance.setStartTime(taskEvent.getStartTime()); + taskInstance.setHost(taskEvent.getWorkerAddress()); + taskInstance.setLogPath(taskEvent.getLogPath()); + taskInstance.setExecutePath(taskEvent.getExecutePath()); + taskInstance.setPid(taskEvent.getProcessId()); + taskInstance.setAppLink(taskEvent.getAppIds()); + taskInstance.setState(taskEvent.getState()); + taskInstance.setEndTime(taskEvent.getEndTime()); + taskInstance.setVarPool(taskEvent.getVarPool()); + processService.changeOutParam(taskInstance); + processService.saveTaskInstance(taskInstance); + } + // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success + TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); + channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command()); + } catch (Exception e) { + logger.error("worker response master error", e); + TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1); + channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command()); + } + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java new file mode 100644 index 0000000000000000000000000000000000000000..ccdab1b97f48945676ae66f7b8ecb831ca9b631a --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.processor.queue; + +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; +import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.concurrent.ConcurrentHashMap; + +import javax.annotation.PostConstruct; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; + +@Component +public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor { + + private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThreadPool.class); + + private final ConcurrentHashMap multiThreadFilterMap = new ConcurrentHashMap<>(); + + @Autowired + private MasterConfig masterConfig; + + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + + /** + * process service + */ + @Autowired + private ProcessService processService; + + /** + * data quality result operator + */ + @Autowired + private DataQualityResultOperator dataQualityResultOperator; + + + @Autowired + private WorkflowExecuteThreadPool workflowExecuteThreadPool; + + /** + * task event thread map + */ + private final ConcurrentHashMap taskExecuteThreadMap = new ConcurrentHashMap<>(); + + @PostConstruct + private void init() { + this.setDaemon(true); + this.setThreadNamePrefix("Task-Execute-Thread-"); + this.setMaxPoolSize(masterConfig.getExecThreads()); + this.setCorePoolSize(masterConfig.getExecThreads()); + } + + public void submitTaskEvent(TaskEvent taskEvent) { + if (!processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) { + logger.warn("workflowExecuteThread is null, event: {}", taskEvent); + return; + } + if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) { + TaskExecuteThread taskExecuteThread = new TaskExecuteThread( + taskEvent.getProcessInstanceId(), + processService, workflowExecuteThreadPool, + processInstanceExecCacheManager, + dataQualityResultOperator); + taskExecuteThreadMap.put(taskEvent.getProcessInstanceId(), taskExecuteThread); + } + TaskExecuteThread taskExecuteThread = taskExecuteThreadMap.get(taskEvent.getProcessInstanceId()); + if (taskExecuteThread != null) { + taskExecuteThread.addEvent(taskEvent); + } + } + + public void eventHandler() { + for (TaskExecuteThread taskExecuteThread: taskExecuteThreadMap.values()) { + executeEvent(taskExecuteThread); + } + } + + public void executeEvent(TaskExecuteThread taskExecuteThread) { + if (taskExecuteThread.eventSize() == 0) { + return; + } + if (multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) { + return; + } + ListenableFuture future = this.submitListenable(() -> { + taskExecuteThread.run(); + multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread); + }); + future.addCallback(new ListenableFutureCallback() { + @Override + public void onFailure(Throwable ex) { + logger.error("handle event {} failed: {}", taskExecuteThread.getProcessInstanceId(), ex); + if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) { + taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId()); + logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId()); + } + multiThreadFilterMap.remove(taskExecuteThread.getKey()); + } + + @Override + public void onSuccess(Object result) { + logger.info("persist events {} succeeded.", taskExecuteThread.getProcessInstanceId()); + if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) { + taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId()); + logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId()); + } + multiThreadFilterMap.remove(taskExecuteThread.getKey()); + } + }); + } +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index 9a054b647de9005fece3ca69edfcde2d39af3ca1..c5b00db12c72caf0da6484375afc1073d6eca847 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -67,6 +67,9 @@ public class TaskResponseServiceTest { @Mock private WorkflowExecuteThreadPool workflowExecuteThreadPool; + @Mock + private TaskExecuteThreadPool taskExecuteThreadPool; + @Before public void before() { taskEventService.start(); @@ -101,8 +104,6 @@ public class TaskResponseServiceTest { @Test public void testAddResponse() { - Mockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance); - Mockito.when(channel.writeAndFlush(Mockito.any())).thenReturn(null); taskEventService.addEvent(ackEvent); taskEventService.addEvent(resultEvent); }