未验证 提交 36ebfe09 编写于 作者: K Kerwin 提交者: GitHub

[Implement][server]TaskProcessor code optimization (#7146)

* TaskProcessor code optimization

* fix code style

* update Collectors.toMap
上级 8f77fbbe
......@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
......@@ -68,6 +69,8 @@ public class MasterSchedulerService extends Thread {
*/
@Autowired
private ProcessService processService;
@Autowired
private TaskProcessorFactory taskProcessorFactory;
/**
* zookeeper master client
......@@ -205,7 +208,8 @@ public class MasterSchedulerService extends Thread {
, nettyExecutorManager
, processAlertManager
, masterConfig
, taskTimeoutCheckList);
, taskTimeoutCheckList
, taskProcessorFactory);
this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);
if (processInstance.getTimeout() > 0) {
......
......@@ -130,6 +130,11 @@ public class WorkflowExecuteThread implements Runnable {
*/
private ProcessDefinition processDefinition;
/**
* task processor
*/
private TaskProcessorFactory taskProcessorFactory;
/**
* the object of DAG
*/
......@@ -216,13 +221,18 @@ public class WorkflowExecuteThread implements Runnable {
* @param processInstance processInstance
* @param processService processService
* @param nettyExecutorManager nettyExecutorManager
* @param processAlertManager processAlertManager
* @param masterConfig masterConfig
* @param taskTimeoutCheckList taskTimeoutCheckList
* @param taskProcessorFactory taskProcessorFactory
*/
public WorkflowExecuteThread(ProcessInstance processInstance
, ProcessService processService
, NettyExecutorManager nettyExecutorManager
, ProcessAlertManager processAlertManager
, MasterConfig masterConfig
, ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList) {
, ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList
, TaskProcessorFactory taskProcessorFactory) {
this.processService = processService;
this.processInstance = processInstance;
......@@ -230,6 +240,7 @@ public class WorkflowExecuteThread implements Runnable {
this.nettyExecutorManager = nettyExecutorManager;
this.processAlertManager = processAlertManager;
this.taskTimeoutCheckList = taskTimeoutCheckList;
this.taskProcessorFactory = taskProcessorFactory;
}
@Override
......@@ -791,7 +802,7 @@ public class WorkflowExecuteThread implements Runnable {
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {
try {
ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
ITaskProcessor taskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
&& taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);
......
......@@ -40,7 +40,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
......@@ -62,6 +61,7 @@ import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.google.common.base.Enums;
import com.google.common.base.Strings;
......@@ -80,7 +80,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected ProcessInstance processInstance;
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
@Autowired
protected ProcessService processService;
/**
* pause task, common tasks donot need this.
......@@ -107,7 +108,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
public void run() {
}
@Override
public boolean action(TaskAction taskAction) {
......@@ -119,7 +119,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
case TIMEOUT:
return timeout();
default:
logger.error("unknown task action: {}", taskAction.toString());
logger.error("unknown task action: {}", taskAction);
}
return false;
......@@ -305,7 +305,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
}
}
/**
* set SQL task relation
*
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.Constants;
import com.google.auto.service.AutoService;
@AutoService(ITaskProcessFactory.class)
public class CommonTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return Constants.COMMON_TASK_TYPE;
}
@Override
public ITaskProcessor create() {
return new CommonTaskProcessor();
}
}
......@@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
......@@ -40,20 +39,19 @@ import org.apache.commons.lang.StringUtils;
import java.util.Date;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* common task processor
*/
@Service
public class CommonTaskProcessor extends BaseTaskProcessor {
@Autowired
private TaskPriorityQueue taskUpdateQueue;
@Autowired
MasterConfig masterConfig;
@Autowired
NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
NettyExecutorManager nettyExecutorManager;
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import com.google.auto.service.AutoService;
@AutoService(ITaskProcessFactory.class)
public class ConditionTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return TaskType.CONDITIONS.getDesc();
}
@Override
public ITaskProcessor create() {
return new ConditionTaskProcessor();
}
}
......@@ -26,14 +26,12 @@ import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
......@@ -41,11 +39,13 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* condition task processor
*/
@Service
public class ConditionTaskProcessor extends BaseTaskProcessor {
/**
......@@ -65,7 +65,8 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
*/
private Map<Long, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
@Autowired
private MasterConfig masterConfig;
private TaskDefinition taskDefinition;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import com.google.auto.service.AutoService;
@AutoService(ITaskProcessFactory.class)
public class DependentTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return TaskType.DEPENDENT.getDesc();
}
@Override
public ITaskProcessor create() {
return new DependentTaskProcessor();
}
}
......@@ -33,7 +33,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
......@@ -41,11 +40,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.annotation.JsonFormat;
/**
* dependent task processor
*/
@Service
public class DependentTaskProcessor extends BaseTaskProcessor {
private DependentParameters dependentParameters;
......@@ -72,7 +75,8 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
ProcessInstance processInstance;
TaskDefinition taskDefinition;
MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
@Autowired
private MasterConfig masterConfig;
boolean allDependentItemFinished;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
public interface ITaskProcessFactory {
String type();
ITaskProcessor create();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import com.google.auto.service.AutoService;
@AutoService(ITaskProcessFactory.class)
public class SubTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return TaskType.SUB_PROCESS.getDesc();
}
@Override
public ITaskProcessor create() {
return new SubTaskProcessor();
}
}
......@@ -25,15 +25,18 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
*
*/
@Service
public class SubTaskProcessor extends BaseTaskProcessor {
private ProcessInstance processInstance;
......@@ -46,7 +49,8 @@ public class SubTaskProcessor extends BaseTaskProcessor {
*/
private final Lock runLock = new ReentrantLock();
private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);
@Autowired
private StateEventCallbackService stateEventCallbackService;
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import com.google.auto.service.AutoService;
@AutoService(ITaskProcessFactory.class)
public class SwitchTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return TaskType.SWITCH.getDesc();
}
@Override
public ITaskProcessor create() {
return new SwitchTaskProcessor();
}
}
......@@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.commons.lang.StringUtils;
......@@ -43,6 +42,10 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class SwitchTaskProcessor extends BaseTaskProcessor {
protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
......@@ -52,7 +55,8 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
private ProcessInstance processInstance;
TaskDefinition taskDefinition;
MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
@Autowired
private MasterConfig masterConfig;
/**
* switch result
......
......@@ -17,37 +17,43 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.Constants;
import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.base.Strings;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* the factory to create task processor
*/
@Service
public class TaskProcessorFactory {
public static final Map<String, ITaskProcessFactory> PROCESS_FACTORY_MAP = new ConcurrentHashMap<>();
private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
private static final String DEFAULT_PROCESSOR = Constants.COMMON_TASK_TYPE;
private Map<String, ITaskProcessor> taskProcessorMap;
static {
for (ITaskProcessFactory iTaskProcessor : ServiceLoader.load(ITaskProcessFactory.class)) {
PROCESS_FACTORY_MAP.put(iTaskProcessor.type(), iTaskProcessor);
}
@Autowired
public TaskProcessorFactory(List<ITaskProcessor> taskProcessors) {
taskProcessorMap = taskProcessors.stream().collect(Collectors.toMap(ITaskProcessor::getType, Function.identity(), (v1, v2) -> v2));
}
public static ITaskProcessor getTaskProcessor(String type) {
if (Strings.isNullOrEmpty(type)) {
return PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR).create();
public ITaskProcessor getTaskProcessor(String key) {
if (StringUtils.isEmpty(key)) {
key = DEFAULT_PROCESSOR;
}
if (!PROCESS_FACTORY_MAP.containsKey(type)) {
return PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR).create();
ITaskProcessor taskProcessor = taskProcessorMap.get(key);
if (Objects.isNull(taskProcessor)) {
taskProcessor = taskProcessorMap.get(DEFAULT_PROCESSOR);
}
return PROCESS_FACTORY_MAP.get(type).create();
}
return taskProcessor;
}
}
......@@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.lang.reflect.Field;
......@@ -81,9 +82,12 @@ public class WorkflowExecuteThreadTest {
private ApplicationContext applicationContext;
private TaskProcessorFactory taskProcessorFactory;
@Before
public void init() throws Exception {
processService = mock(ProcessService.class);
taskProcessorFactory = mock(TaskProcessorFactory.class);
applicationContext = mock(ApplicationContext.class);
config = new MasterConfig();
......@@ -104,7 +108,7 @@ public class WorkflowExecuteThreadTest {
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList));
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList, taskProcessorFactory));
// prepareProcess init dag
Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
dag.setAccessible(true);
......
......@@ -26,13 +26,14 @@ import org.junit.Test;
@Ignore
public class TaskProcessorFactoryTest {
private TaskProcessorFactory taskProcessorFactory;
@Test
public void testFactory() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("shell");
ITaskProcessor iTaskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
Assert.assertNotNull(iTaskProcessor);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册