未验证 提交 fd76436d 编写于 作者: W wangxj3 提交者: GitHub

[Feature-#8318] [master-server] Sub process transfer parameters to father process (#8522)

* load param from subprocess

* unit test

* modify code style

Co-authored-by: wangxj <wangxj31>
上级 df519f80
......@@ -17,14 +17,10 @@
package org.apache.dolphinscheduler.server.master.runner;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
......@@ -34,7 +30,6 @@ import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
......@@ -58,6 +53,7 @@ import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
......@@ -71,9 +67,8 @@ import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
......@@ -89,11 +84,15 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
/**
* master exec thread,split dag
......@@ -448,7 +447,6 @@ public class WorkflowExecuteThread {
if (taskInstance.getState().typeIsSuccess()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
processInstance.setVarPool(taskInstance.getVarPool());
processService.saveProcessInstance(processInstance);
if (!processInstance.isBlocked()) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
......@@ -1280,6 +1278,24 @@ public class WorkflowExecuteThread {
TaskInstance task = createTaskInstance(processInstance, taskNodeObject);
taskInstances.add(task);
}
//the end node of the branch of the dag
if (StringUtils.isNotEmpty(parentNodeCode) && dag.getEndNode().contains(parentNodeCode)){
TaskInstance endTaskInstance = taskInstanceMap.get(completeTaskMap.get(NumberUtils.toLong(parentNodeCode)));
String taskInstanceVarPool = endTaskInstance.getVarPool();
if(StringUtils.isNotEmpty(taskInstanceVarPool)) {
Set<Property> taskProperties = JSONUtils.toList(taskInstanceVarPool, Property.class)
.stream().collect(Collectors.toSet());
String processInstanceVarPool = processInstance.getVarPool();
if (StringUtils.isNotEmpty(processInstanceVarPool)) {
Set<Property> properties = JSONUtils.toList(processInstanceVarPool, Property.class)
.stream().collect(Collectors.toSet());
properties.addAll(taskProperties);
processInstance.setVarPool(JSONUtils.toJsonString(properties));
}else{
processInstance.setVarPool(JSONUtils.toJsonString(taskProperties));
}
}
}
// if previous node success , post node submit
for (TaskInstance task : taskInstances) {
......
......@@ -17,21 +17,29 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.auto.service.AutoService;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import com.google.auto.service.AutoService;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
/**
* subtask processor
......@@ -111,10 +119,40 @@ public class SubTaskProcessor extends BaseTaskProcessor {
if (subProcessInstance != null && subProcessInstance.getState().typeIsFinished()) {
taskInstance.setState(subProcessInstance.getState());
taskInstance.setEndTime(new Date());
dealFinish();
processService.saveTaskInstance(taskInstance);
}
}
/**
* get the params from subProcessInstance to this subProcessTask
*/
private void dealFinish() {
String thisTaskInstanceVarPool = taskInstance.getVarPool();
if (StringUtils.isNotEmpty(thisTaskInstanceVarPool)) {
String subProcessInstanceVarPool = subProcessInstance.getVarPool();
if (StringUtils.isNotEmpty(subProcessInstanceVarPool)) {
List<Property> varPoolProperties = JSONUtils.toList(thisTaskInstanceVarPool, Property.class);
Map<String, Object> taskParams = JSONUtils.parseObject(taskInstance.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
Object localParams = taskParams.get(LOCAL_PARAMS);
if (localParams != null) {
List<Property> properties = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
Map<String, String> subProcessParam = JSONUtils.toList(subProcessInstanceVarPool, Property.class).stream()
.collect(Collectors.toMap(Property::getProp, Property::getValue));
List<Property> outProperties = properties.stream().filter(r -> Direct.OUT == r.getDirect()).collect(Collectors.toList());
for (Property info : outProperties) {
info.setValue(subProcessParam.get(info.getProp()));
varPoolProperties.add(info);
}
taskInstance.setVarPool(JSONUtils.toJsonString(varPoolProperties));
//deal with localParam for show in the page
processService.changeOutParam(taskInstance);
}
}
}
}
@Override
protected boolean pauseTask() {
pauseSubWorkFlow();
......@@ -180,4 +218,5 @@ public class SubTaskProcessor extends BaseTaskProcessor {
public String getType() {
return TASK_TYPE_SUB_PROCESS;
}
}
......@@ -17,19 +17,24 @@
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.task.SubTaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -39,8 +44,11 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.context.ApplicationContext;
import java.util.ArrayList;
import java.util.List;
@RunWith(PowerMockRunner.class)
@PrepareForTest({ Stopper.class })
@PrepareForTest({Stopper.class})
public class SubProcessTaskTest {
/**
......@@ -76,17 +84,17 @@ public class SubProcessTaskTest {
TaskInstance taskInstance = getTaskInstance();
Mockito.when(processService
.findProcessInstanceById(processInstance.getId()))
.findProcessInstanceById(processInstance.getId()))
.thenReturn(processInstance);
// for SubProcessTaskExecThread.setTaskInstanceState
Mockito.when(processService
.updateTaskInstance(Mockito.any()))
.updateTaskInstance(Mockito.any()))
.thenReturn(true);
// for MasterBaseTaskExecThread.submit
Mockito.when(processService
.submitTask(processInstance, taskInstance))
.submitTask(processInstance, taskInstance))
.thenAnswer(t -> t.getArgument(0));
TaskDefinition taskDefinition = new TaskDefinition();
......@@ -101,12 +109,13 @@ public class SubProcessTaskTest {
TaskInstance taskInstance = getTaskInstance(getTaskNode(), processInstance);
ProcessInstance subProcessInstance = getSubProcessInstance(expectResult);
subProcessInstance.setVarPool(getProperty());
// for SubProcessTaskExecThread.waitTaskQuit
Mockito.when(processService
.findProcessInstanceById(subProcessInstance.getId()))
.findProcessInstanceById(subProcessInstance.getId()))
.thenReturn(subProcessInstance);
Mockito.when(processService
.findSubProcessInstance(processInstance.getId(), taskInstance.getId()))
.findSubProcessInstance(processInstance.getId(), taskInstance.getId()))
.thenReturn(subProcessInstance);
return taskInstance;
......@@ -120,6 +129,34 @@ public class SubProcessTaskTest {
//Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState());
}
@Test
public void testFinish() {
TaskInstance taskInstance = testBasicInit(ExecutionStatus.SUCCESS);
taskInstance.setVarPool(getProperty());
taskInstance.setTaskParams("{\"processDefinitionCode\":110," +
"\"dependence\":{},\"localParams\":[{\"prop\":\"key\"," +
"\"direct\":\"out\",\"type\":\"VARCHAR\",\"value\":\"\"}," +
"{\"prop\":\"database_name\",\"direct\":\"OUT\"," +
"\"type\":\"VARCHAR\",\"value\":\"\"}]," +
"\"conditionResult\":{\"successNode\":[],\"failedNode\":[]}," +
"\"waitStartTimeout\":{},\"switchResult\":{}}");
SubTaskProcessor subTaskProcessor = new SubTaskProcessor();
subTaskProcessor.init(taskInstance, processInstance);
subTaskProcessor.action(TaskAction.RUN);
ExecutionStatus status = taskInstance.getState();
Assert.assertEquals(ExecutionStatus.SUCCESS, status);
}
private String getProperty() {
List<Property> varPools = new ArrayList<>();
Property property = new Property();
property.setProp("key");
property.setValue("1");
property.setDirect(Direct.OUT);
varPools.add(property);
return JSONUtils.toJsonString(varPools);
}
@Test
public void testBasicFailure() {
TaskInstance taskInstance = testBasicInit(ExecutionStatus.FAILURE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册