diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 7b2a1cdcbf7c7a3994b0494a869930d15458583b..0a892651d2602d757e3097dbed08ed6c713880d8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -30,13 +30,10 @@ import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProce import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; -import java.util.concurrent.ConcurrentHashMap; - import javax.annotation.PostConstruct; import org.quartz.SchedulerException; @@ -100,8 +97,6 @@ public class MasterServer implements IStoppable { @Autowired private EventExecuteService eventExecuteService; - private ConcurrentHashMap processInstanceExecMaps = new ConcurrentHashMap<>(); - /** * master server startup, not use web service * @@ -121,27 +116,21 @@ public class MasterServer implements IStoppable { NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(masterConfig.getListenPort()); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); - TaskAckProcessor ackProcessor = new TaskAckProcessor(); - ackProcessor.init(processInstanceExecMaps); - TaskResponseProcessor taskResponseProcessor = new TaskResponseProcessor(); - taskResponseProcessor.init(processInstanceExecMaps); - StateEventProcessor stateEventProcessor = new StateEventProcessor(); - stateEventProcessor.init(processInstanceExecMaps); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, ackProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, new StateEventProcessor()); this.nettyRemotingServer.start(); // self tolerant - this.masterRegistryClient.init(this.processInstanceExecMaps); + this.masterRegistryClient.init(); this.masterRegistryClient.start(); this.masterRegistryClient.setRegistryStoppable(this); - this.eventExecuteService.init(this.processInstanceExecMaps); + this.eventExecuteService.init(); this.eventExecuteService.start(); // scheduler start - this.masterSchedulerService.init(this.processInstanceExecMaps); + this.masterSchedulerService.init(); this.masterSchedulerService.start(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java new file mode 100644 index 0000000000000000000000000000000000000000..5fdf64493cf672fa70a18bcd579b6b2d8ea0ab2b --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.cache; + +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; + +import java.util.Collection; + +/** + * cache of process instance id and WorkflowExecuteThread + */ +public interface ProcessInstanceExecCacheManager { + + /** + * get WorkflowExecuteThread by process instance id + * + * @param processInstanceId processInstanceId + * @return WorkflowExecuteThread + */ + WorkflowExecuteThread getByProcessInstanceId(int processInstanceId); + + /** + * judge the process instance does it exist + * + * @param processInstanceId processInstanceId + * @return true - if process instance id exists in cache + */ + boolean contains(int processInstanceId); + + /** + * remove cache by process instance id + * + * @param processInstanceId processInstanceId + */ + void removeByProcessInstanceId(int processInstanceId); + + /** + * cache + * + * @param processInstanceId processInstanceId + * @param workflowExecuteThread if it is null, will not be cached + */ + void cache(int processInstanceId, WorkflowExecuteThread workflowExecuteThread); + + /** + * get all WorkflowExecuteThread from cache + * + * @return all WorkflowExecuteThread in cache + */ + Collection getAll(); +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..1d0ab4841aafb02119d9b01d8cf172587286acad --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.cache.impl; + +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; + +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; + +import org.springframework.stereotype.Component; + +import com.google.common.collect.ImmutableList; + +/** + * cache of process instance id and WorkflowExecuteThread + */ +@Component +public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecCacheManager { + + private final ConcurrentHashMap processInstanceExecMaps = new ConcurrentHashMap<>(); + + @Override + public WorkflowExecuteThread getByProcessInstanceId(int processInstanceId) { + return processInstanceExecMaps.get(processInstanceId); + } + + @Override + public boolean contains(int processInstanceId) { + return processInstanceExecMaps.containsKey(processInstanceId); + } + + @Override + public void removeByProcessInstanceId(int processInstanceId) { + processInstanceExecMaps.remove(processInstanceId); + } + + @Override + public void cache(int processInstanceId, WorkflowExecuteThread workflowExecuteThread) { + if (workflowExecuteThread == null) { + return; + } + processInstanceExecMaps.put(processInstanceId, workflowExecuteThread); + } + + @Override + public Collection getAll() { + return ImmutableList.copyOf(processInstanceExecMaps.values()); + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java index 2f9a6342501e5b2d71c51607ae08e4d1d645084a..b03940395d8ce6d93d1492ad3f687d751fa8bb59 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java @@ -26,11 +26,8 @@ import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import java.util.concurrent.ConcurrentHashMap; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,10 +48,6 @@ public class StateEventProcessor implements NettyRequestProcessor { stateEventResponseService = SpringApplicationContext.getBean(StateEventResponseService.class); } - public void init(ConcurrentHashMap processInstanceExecMaps) { - this.stateEventResponseService.init(processInstanceExecMaps); - } - @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.STATE_EVENT_REQUEST == command.getType(), String.format("invalid command type: %s", command.getType())); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index 15f97c17a574708d98e4ee2cea5e25f001651634..87612321c3ff208bda687284c00d329feaab5962 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -28,11 +28,8 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import java.util.concurrent.ConcurrentHashMap; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,10 +59,6 @@ public class TaskAckProcessor implements NettyRequestProcessor { this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); } - public void init(ConcurrentHashMap processInstanceExecMaps) { - this.taskResponseService.init(processInstanceExecMaps); - } - /** * task ack process * diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 5c6ade7fccb3377efd7eb8f103c6f722438b9ebc..405e6be48f02094109600cb98d4ef4d7f8168608 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -27,11 +27,8 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import java.util.concurrent.ConcurrentHashMap; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,10 +58,6 @@ public class TaskResponseProcessor implements NettyRequestProcessor { this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); } - public void init(ConcurrentHashMap processInstanceExecMaps) { - this.taskResponseService.init(processInstanceExecMaps); - } - /** * task final result response * need master process , state persistence diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java index bc9c77cfaf924726757feefcad70793f5b6df8e3..72e23553253a762af27ef3e9ab55865fc59706e6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java @@ -21,12 +21,12 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import javax.annotation.PostConstruct; @@ -34,6 +34,7 @@ import javax.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import io.netty.channel.Channel; @@ -59,13 +60,8 @@ public class StateEventResponseService { */ private Thread responseWorker; - private ConcurrentHashMap processInstanceMapper; - - public void init(ConcurrentHashMap processInstanceMapper) { - if (this.processInstanceMapper == null) { - this.processInstanceMapper = processInstanceMapper; - } - } + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; @PostConstruct public void start() { @@ -131,12 +127,12 @@ public class StateEventResponseService { private void persist(StateEvent stateEvent) { try { - if (!this.processInstanceMapper.containsKey(stateEvent.getProcessInstanceId())) { + if (!this.processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) { writeResponse(stateEvent, ExecutionStatus.FAILURE); return; } - WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(stateEvent.getProcessInstanceId()); + WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); workflowExecuteThread.addStateEvent(stateEvent); writeResponse(stateEvent, ExecutionStatus.SUCCESS); } catch (Exception e) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index 27b96e14d8c11d69261a4dc2591e3601bac61b97..9af1ae243b26c15e5d1912d0d84ebceb72173c8b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -25,13 +25,13 @@ import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand; import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import javax.annotation.PostConstruct; @@ -71,13 +71,8 @@ public class TaskResponseService { */ private Thread taskResponseWorker; - private ConcurrentHashMap processInstanceMapper; - - public void init(ConcurrentHashMap processInstanceMapper) { - if (this.processInstanceMapper == null) { - this.processInstanceMapper = processInstanceMapper; - } - } + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; @PostConstruct public void start() { @@ -194,7 +189,7 @@ public class TaskResponseService { default: throw new IllegalArgumentException("invalid event type : " + event); } - WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId()); + WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(taskResponseEvent.getProcessInstanceId()); if (workflowExecuteThread != null) { StateEvent stateEvent = new StateEvent(); stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index 7bae6de162012c16cf1fa28f3e1637166de1ed35..8b5aca0a1555d532e52b838c02490bd154ac7ed7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.server.registry.HeartBeatTask; @@ -48,7 +49,6 @@ import org.apache.commons.lang.StringUtils; import java.util.Date; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -92,7 +92,8 @@ public class MasterRegistryClient { */ private ScheduledExecutorService heartBeatExecutor; - private ConcurrentHashMap processInstanceExecMaps; + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; /** * master startup time, ms @@ -101,11 +102,10 @@ public class MasterRegistryClient { private String localNodePath; - public void init(ConcurrentHashMap processInstanceExecMaps) { + public void init() { this.startupTime = System.currentTimeMillis(); this.registryClient = RegistryClient.getInstance(); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); - this.processInstanceExecMaps = processInstanceExecMaps; } public void start() { @@ -308,10 +308,10 @@ public class MasterRegistryClient { taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); processService.saveTaskInstance(taskInstance); - if (!processInstanceExecMaps.containsKey(processInstance.getId())) { + if (!processInstanceExecCacheManager.contains(processInstance.getId())) { return; } - WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecMaps.get(processInstance.getId()); + WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecCacheManager.getByProcessInstanceId(processInstance.getId()); StateEvent stateEvent = new StateEvent(); stateEvent.setTaskInstanceId(taskInstance.getId()); stateEvent.setType(StateEventType.TASK_STATE_CHANGE); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java index 7c4b32130d675210c4046cd621c2b05823a083f8..49e12e5f67584e290042d397e54b82ab51f57ea9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -72,17 +73,16 @@ public class EventExecuteService extends Thread { */ private StateEventCallbackService stateEventCallbackService; + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; - private ConcurrentHashMap processInstanceExecMaps; private ConcurrentHashMap eventHandlerMap = new ConcurrentHashMap(); ListeningExecutorService listeningExecutorService; - public void init(ConcurrentHashMap processInstanceExecMaps) { + public void init() { eventExecService = ThreadUtils.newDaemonFixedThreadExecutor("MasterEventExecution", masterConfig.getMasterExecThreads()); - this.processInstanceExecMaps = processInstanceExecMaps; - listeningExecutorService = MoreExecutors.listeningDecorator(eventExecService); this.stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class); @@ -115,7 +115,7 @@ public class EventExecuteService extends Thread { } private void eventHandler() { - for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecMaps.values()) { + for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) { if (workflowExecuteThread.eventSize() == 0 || StringUtils.isEmpty(workflowExecuteThread.getKey()) || eventHandlerMap.containsKey(workflowExecuteThread.getKey())) { @@ -132,13 +132,13 @@ public class EventExecuteService extends Thread { @Override public void onSuccess(Object o) { if (workflowExecuteThread.workFlowFinish()) { - processInstanceExecMaps.remove(processInstanceId); + processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId); notifyProcessChanged(); logger.info("process instance {} finished.", processInstanceId); } if (workflowExecuteThread.getProcessInstance().getId() != processInstanceId) { - processInstanceExecMaps.remove(processInstanceId); - processInstanceExecMaps.put(workflowExecuteThread.getProcessInstance().getId(), workflowExecuteThread); + processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId); + processInstanceExecCacheManager.cache(workflowExecuteThread.getProcessInstance().getId(), workflowExecuteThread); } eventHandlerMap.remove(workflowExecuteThread.getKey()); @@ -160,10 +160,10 @@ public class EventExecuteService extends Thread { private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) { logger.info("notify process {} task {} state change", processInstance.getId(), taskInstance.getId()); - if (!processInstanceExecMaps.containsKey(processInstance.getId())) { + if (!processInstanceExecCacheManager.contains(processInstance.getId())) { return; } - WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecMaps.get(processInstance.getId()); + WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecCacheManager.getByProcessInstanceId(processInstance.getId()); StateEvent stateEvent = new StateEvent(); stateEvent.setTaskInstanceId(taskInstance.getId()); stateEvent.setType(StateEventType.TASK_STATE_CHANGE); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 803ba09bccaa96f7f243adccb1dbf8df1e50ba26..ad1ffe6632249d6124de234ae9b92d6384349ac7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; @@ -94,10 +95,9 @@ public class MasterSchedulerService extends Thread { */ private ThreadPoolExecutor masterExecService; - /** - * process instance execution list - */ - private ConcurrentHashMap processInstanceExecMaps; + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + /** * process timeout check list */ @@ -119,15 +119,14 @@ public class MasterSchedulerService extends Thread { /** * constructor of MasterSchedulerService */ - public void init(ConcurrentHashMap processInstanceExecMaps) { - this.processInstanceExecMaps = processInstanceExecMaps; + public void init() { this.masterExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()); NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); stateWheelExecuteThread = new StateWheelExecuteThread(processTimeoutCheckList, taskTimeoutCheckList, - this.processInstanceExecMaps, + this.processInstanceExecCacheManager, masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS); } @@ -203,7 +202,7 @@ public class MasterSchedulerService extends Thread { , masterConfig , taskTimeoutCheckList); - this.processInstanceExecMaps.put(processInstance.getId(), workflowExecuteThread); + this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread); if (processInstance.getTimeout() > 0) { this.processTimeoutCheckList.put(processInstance.getId(), processInstance); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index f2b10f7898381f7ead7f1aa4e78186b62492727f..69bd3cb4ad30eea0a6a6c66ef56db8f505f2bd25 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.hadoop.util.ThreadUtil; @@ -44,17 +45,17 @@ public class StateWheelExecuteThread extends Thread { ConcurrentHashMap processInstanceCheckList; ConcurrentHashMap taskInstanceCheckList; - private ConcurrentHashMap processInstanceExecMaps; + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; private int stateCheckIntervalSecs; public StateWheelExecuteThread(ConcurrentHashMap processInstances, ConcurrentHashMap taskInstances, - ConcurrentHashMap processInstanceExecMaps, + ProcessInstanceExecCacheManager processInstanceExecCacheManager, int stateCheckIntervalSecs) { this.processInstanceCheckList = processInstances; this.taskInstanceCheckList = taskInstances; - this.processInstanceExecMaps = processInstanceExecMaps; + this.processInstanceExecCacheManager = processInstanceExecCacheManager; this.stateCheckIntervalSecs = stateCheckIntervalSecs; } @@ -121,10 +122,10 @@ public class StateWheelExecuteThread extends Thread { private void putEvent(StateEvent stateEvent) { - if (!processInstanceExecMaps.containsKey(stateEvent.getProcessInstanceId())) { + if (!processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) { return; } - WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecMaps.get(stateEvent.getProcessInstanceId()); + WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); workflowExecuteThread.addStateEvent(stateEvent); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6fa9116e63e8a610bc966dba42228e810dae547e --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.cache.impl; + +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; + +import java.util.Collection; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ProcessInstanceExecCacheManagerImplTest { + + @InjectMocks + private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager; + + @Mock + private WorkflowExecuteThread workflowExecuteThread; + + @Before + public void before() { + Mockito.when(workflowExecuteThread.getKey()).thenReturn("workflowExecuteThread1"); + processInstanceExecCacheManager.cache(1, workflowExecuteThread); + } + + @Test + public void testGetByProcessInstanceId() { + WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1); + Assert.assertEquals("workflowExecuteThread1", workflowExecuteThread.getKey()); + } + + @Test + public void testContains() { + Assert.assertTrue(processInstanceExecCacheManager.contains(1)); + } + + @Test + public void testCacheNull() { + processInstanceExecCacheManager.cache(2, null); + WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(2); + Assert.assertNull(workflowExecuteThread); + } + + @Test + public void testRemoveByProcessInstanceId() { + processInstanceExecCacheManager.removeByProcessInstanceId(1); + WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1); + Assert.assertNull(workflowExecuteThread); + } + + @Test + public void testGetAll() { + Collection workflowExecuteThreads = processInstanceExecCacheManager.getAll(); + Assert.assertEquals(1, workflowExecuteThreads.size()); + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index 878446c30cb60f9b128330d9f96c4839f6bdb42a..72ea4b903fcfab3429f9db2c0ea4b771816aa5b2 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.processor.queue; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl; import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.Date; @@ -52,6 +53,9 @@ public class TaskResponseServiceTest { private TaskInstance taskInstance; + @Mock + private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager; + @Before public void before() { taskRspService.start(); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java index a91c6e28ee19da1ce64e07e1d25bd17c8d464f4c..073df65b8438e5d3321ac09b49dea987414748de 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.registry.RegistryClient; @@ -66,6 +67,9 @@ public class MasterRegistryClientTest { @Mock private ProcessService processService; + @Mock + private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager; + @Before public void before() throws Exception { PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class)); diff --git a/pom.xml b/pom.xml index 89fa0c0434ecd38b0df06489f994c484bdb9e175..b453ce9b997fe502464016873dd023f1d5895507 100644 --- a/pom.xml +++ b/pom.xml @@ -1046,6 +1046,7 @@ **/server/worker/task/sql/SqlTaskTest.java **/server/worker/runner/TaskExecuteThreadTest.java **/server/worker/runner/WorkerManagerThreadTest.java + **/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java **/service/quartz/cron/CronUtilsTest.java **/service/process/ProcessServiceTest.java **/service/registry/RegistryClientTest.java