提交 54f99310 编写于 作者: C caishunfeng 提交者: Wenjun Ruan

[Feature] add state history for process instance (#97)

* add state history for process instance

* upsertProcessInstance
上级 0ee8dfa9
......@@ -62,6 +62,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
......@@ -123,6 +124,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
@Autowired
private ProcessService processService;
@Autowired
private ProcessInstanceDao processInstanceDao;
@Autowired
private StateEventCallbackService stateEventCallbackService;
......@@ -507,8 +511,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
processInstance.setCommandType(commandType);
processInstance.addHistoryCmd(commandType);
processInstance.setState(executionStatus);
int update = processService.updateProcessInstance(processInstance);
processInstance.setStateWithDesc(executionStatus, commandType.getDescp() + "by ui");
int update = processInstanceDao.updateProcessInstance(processInstance);
// determine whether the process is normal
if (update > 0) {
......
......@@ -67,6 +67,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
......@@ -121,6 +122,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Autowired
ProcessInstanceMapper processInstanceMapper;
@Autowired
ProcessInstanceDao processInstanceDao;
@Autowired
ProcessDefinitionMapper processDefineMapper;
......@@ -548,7 +552,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
processInstance.setProcessDefinitionVersion(insertVersion);
int update = processService.updateProcessInstance(processInstance);
int update = processInstanceDao.updateProcessInstance(processInstance);
if (update == 0) {
putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_INSTANCE_ERROR);
......
......@@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
......@@ -95,6 +96,9 @@ public class ProcessInstanceServiceTest {
@Mock
ProcessService processService;
@Mock
ProcessInstanceDao processInstanceDao;
@Mock
ProcessInstanceMapper processInstanceMapper;
......@@ -449,7 +453,7 @@ public class ProcessInstanceServiceTest {
when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant);
when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant);
when(processService.updateProcessInstance(processInstance)).thenReturn(1);
when(processInstanceDao.updateProcessInstance(processInstance)).thenReturn(1);
when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.FALSE)).thenReturn(1);
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
......
......@@ -17,11 +17,6 @@
package org.apache.dolphinscheduler.dao.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.google.common.base.Strings;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
......@@ -29,14 +24,32 @@ import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Strings;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
/**
* process instance
*/
@Data
@TableName("t_ds_process_instance")
public class ProcessInstance {
......@@ -60,6 +73,18 @@ public class ProcessInstance {
* process state
*/
private ExecutionStatus state;
/**
* state history
*/
private String stateHistory;
/**
* state desc list from state history
*/
@TableField(exist = false)
private List<StateDesc> stateDescList;
/**
* recovery flag for failover
*/
......@@ -271,266 +296,10 @@ public class ProcessInstance {
DateUtils.getCurrentTimeStamp());
}
public String getVarPool() {
return varPool;
}
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public ProcessDefinition getProcessDefinition() {
return processDefinition;
}
public void setProcessDefinition(ProcessDefinition processDefinition) {
this.processDefinition = processDefinition;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public ExecutionStatus getState() {
return state;
}
public void setState(ExecutionStatus state) {
this.state = state;
}
public Flag getRecovery() {
return recovery;
}
public void setRecovery(Flag recovery) {
this.recovery = recovery;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
public int getRunTimes() {
return runTimes;
}
public void setRunTimes(int runTimes) {
this.runTimes = runTimes;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public CommandType getCommandType() {
return commandType;
}
public void setCommandType(CommandType commandType) {
this.commandType = commandType;
}
public String getCommandParam() {
return commandParam;
}
public void setCommandParam(String commandParam) {
this.commandParam = commandParam;
}
public TaskDependType getTaskDependType() {
return taskDependType;
}
public void setTaskDependType(TaskDependType taskDependType) {
this.taskDependType = taskDependType;
}
public int getMaxTryTimes() {
return maxTryTimes;
}
public void setMaxTryTimes(int maxTryTimes) {
this.maxTryTimes = maxTryTimes;
}
public FailureStrategy getFailureStrategy() {
return failureStrategy;
}
public void setFailureStrategy(FailureStrategy failureStrategy) {
this.failureStrategy = failureStrategy;
}
public boolean isProcessInstanceStop() {
return this.state.typeIsFinished();
}
public WarningType getWarningType() {
return warningType;
}
public void setWarningType(WarningType warningType) {
this.warningType = warningType;
}
public Integer getWarningGroupId() {
return warningGroupId;
}
public void setWarningGroupId(Integer warningGroupId) {
this.warningGroupId = warningGroupId;
}
public Date getScheduleTime() {
return scheduleTime;
}
public void setScheduleTime(Date scheduleTime) {
this.scheduleTime = scheduleTime;
}
public Date getCommandStartTime() {
return commandStartTime;
}
public void setCommandStartTime(Date commandStartTime) {
this.commandStartTime = commandStartTime;
}
public String getGlobalParams() {
return globalParams;
}
public void setGlobalParams(String globalParams) {
this.globalParams = globalParams;
}
public DagData getDagData() {
return dagData;
}
public void setDagData(DagData dagData) {
this.dagData = dagData;
}
public String getTenantCode() {
return tenantCode;
}
public void setTenantCode(String tenantCode) {
this.tenantCode = tenantCode;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public int getExecutorId() {
return executorId;
}
public void setExecutorId(int executorId) {
this.executorId = executorId;
}
public Flag getIsSubProcess() {
return isSubProcess;
}
public void setIsSubProcess(Flag isSubProcess) {
this.isSubProcess = isSubProcess;
}
public Priority getProcessInstancePriority() {
return processInstancePriority;
}
public void setProcessInstancePriority(Priority processInstancePriority) {
this.processInstancePriority = processInstancePriority;
}
public String getLocations() {
return locations;
}
public void setLocations(String locations) {
this.locations = locations;
}
public String getHistoryCmd() {
return historyCmd;
}
public void setHistoryCmd(String historyCmd) {
this.historyCmd = historyCmd;
}
public String getExecutorName() {
return executorName;
}
public void setExecutorName(String executorName) {
this.executorName = executorName;
}
public Long getEnvironmentCode() {
return this.environmentCode;
}
public void setEnvironmentCode(Long environmentCode) {
this.environmentCode = environmentCode;
}
public int getDryRun() {
return dryRun;
}
public void setDryRun(int dryRun) {
this.dryRun = dryRun;
}
public Date getRestartTime() {
return restartTime;
}
public void setRestartTime(Date restartTime) {
this.restartTime = restartTime;
}
/**
* add command to history
*
......@@ -569,68 +338,20 @@ public class ProcessInstance {
return commandType;
}
public String getDependenceScheduleTimes() {
return dependenceScheduleTimes;
}
public void setDependenceScheduleTimes(String dependenceScheduleTimes) {
this.dependenceScheduleTimes = dependenceScheduleTimes;
}
public String getDuration() {
return duration;
}
public void setDuration(String duration) {
this.duration = duration;
}
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public int getTenantId() {
return this.tenantId;
}
public void setTenantId(int tenantId) {
this.tenantId = tenantId;
}
public Long getProcessDefinitionCode() {
return processDefinitionCode;
}
public void setProcessDefinitionCode(Long processDefinitionCode) {
this.processDefinitionCode = processDefinitionCode;
}
public int getProcessDefinitionVersion() {
return processDefinitionVersion;
}
public void setProcessDefinitionVersion(int processDefinitionVersion) {
this.processDefinitionVersion = processDefinitionVersion;
}
public boolean isBlocked() {
return isBlocked;
}
public void setBlocked(boolean blocked) {
isBlocked = blocked;
/**
* set state with desc
* @param state
* @param stateDesc
*/
public void setStateWithDesc(ExecutionStatus state, String stateDesc) {
this.setState(state);
if (StringUtils.isEmpty(this.getStateHistory())) {
stateDescList = new ArrayList<>();
} else if (stateDescList == null) {
stateDescList = JSONUtils.toList(this.getStateHistory(), StateDesc.class);
}
stateDescList.add(new StateDesc(new Date(), state, stateDesc));
this.setStateHistory(JSONUtils.toJsonString(stateDescList));
}
@Override
......@@ -735,11 +456,12 @@ public class ProcessInstance {
return Objects.hash(id);
}
public int getNextProcessInstanceId() {
return nextProcessInstanceId;
}
public void setNextProcessInstanceId(int nextProcessInstanceId) {
this.nextProcessInstanceId = nextProcessInstanceId;
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class StateDesc {
Date time;
ExecutionStatus state;
String desc;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
public interface ProcessInstanceDao {
public int insertProcessInstance(ProcessInstance processInstance);
public int updateProcessInstance(ProcessInstance processInstance);
/**
* insert or update work process instance to database
*
* @param processInstance processInstance
*/
public int upsertProcessInstance(ProcessInstance processInstance);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Repository
public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
@Autowired
private ProcessInstanceMapper processInstanceMapper;
@Override
public int insertProcessInstance(ProcessInstance processInstance) {
return processInstanceMapper.insert(processInstance);
}
@Override
public int updateProcessInstance(ProcessInstance processInstance) {
return processInstanceMapper.updateById(processInstance);
}
@Override
public int upsertProcessInstance(@NonNull ProcessInstance processInstance) {
if (processInstance.getId() != 0) {
return updateProcessInstance(processInstance);
} else {
return insertProcessInstance(processInstance);
}
}
}
......@@ -24,7 +24,7 @@
warning_group_id, schedule_time, command_start_time, global_params, flag,
update_time, is_sub_process, executor_id, history_cmd,
process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool,
dry_run, next_process_instance_id, restart_time
dry_run, next_process_instance_id, restart_time, state_history
</sql>
<select id="queryDetailById" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
......@@ -103,7 +103,7 @@
select instance.id, instance.command_type, instance.executor_id, instance.process_definition_version,
instance.process_definition_code, instance.name, instance.state, instance.schedule_time, instance.start_time,
instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run ,instance.next_process_instance_id,
restart_time
restart_time, instance.state_history
from t_ds_process_instance instance
join t_ds_process_definition define ON instance.process_definition_code = define.code
where instance.is_sub_process=0
......
......@@ -584,6 +584,7 @@ CREATE TABLE t_ds_process_instance
process_definition_version int(11) DEFAULT NULL,
process_definition_code bigint(20) not NULL,
state tinyint(4) DEFAULT NULL,
state_history text,
recovery tinyint(4) DEFAULT NULL,
start_time datetime DEFAULT NULL,
end_time datetime DEFAULT NULL,
......
......@@ -586,6 +586,7 @@ CREATE TABLE `t_ds_process_instance` (
`process_definition_code` bigint(20) NOT NULL COMMENT 'process definition code',
`process_definition_version` int(11) DEFAULT '0' COMMENT 'process definition version',
`state` tinyint(4) DEFAULT NULL COMMENT 'process instance Status: 0 commit succeeded, 1 running, 2 prepare to pause, 3 pause, 4 prepare to stop, 5 stop, 6 fail, 7 succeed, 8 need fault tolerance, 9 kill, 10 wait for thread, 11 wait for dependency to complete',
`state_history` text DEFAULT NULL COMMENT 'state history desc',
`recovery` tinyint(4) DEFAULT NULL COMMENT 'process instance failover flag:0:normal,1:failover instance',
`start_time` datetime DEFAULT NULL COMMENT 'process instance start time',
`end_time` datetime DEFAULT NULL COMMENT 'process instance end time',
......
......@@ -512,6 +512,7 @@ CREATE TABLE t_ds_process_instance (
process_definition_code bigint DEFAULT NULL ,
process_definition_version int DEFAULT NULL ,
state int DEFAULT NULL ,
state_history text,
recovery int DEFAULT NULL ,
start_time timestamp DEFAULT NULL ,
end_time timestamp DEFAULT NULL ,
......
......@@ -18,3 +18,5 @@
ALTER TABLE `t_ds_worker_group` ADD COLUMN `worker_group_extra_param` text DEFAULT NULL COMMENT 'extra params, e.g. default address filter list..';
ALTER TABLE `t_ds_process_instance` ADD COLUMN `state_history` text DEFAULT NULL COMMENT 'state history desc' AFTER `state`;
......@@ -37,6 +37,7 @@ BEGIN
--- add column
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_worker_group ADD COLUMN IF NOT EXISTS worker_group_extra_param int DEFAULT NULL ';
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_process_isntance ADD COLUMN IF NOT EXISTS state_history text DEFAULT NULL ';
return 'Success!';
......
......@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.SlotCheckState;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
......@@ -30,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
......@@ -47,6 +49,7 @@ import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -69,6 +72,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
@Autowired
private ProcessService processService;
@Autowired
private ProcessInstanceDao processInstanceDao;
@Autowired
private MasterConfig masterConfig;
......@@ -115,7 +121,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
*/
public void init() {
this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils
.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
}
......@@ -145,7 +151,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
}
// todo: if the workflow event queue is much, we need to handle the back pressure
boolean isOverload =
OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
if (isOverload) {
MasterServerMetrics.incMasterOverload();
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
......@@ -171,18 +177,19 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
if (processInstanceExecCacheManager.contains(processInstance.getId())) {
logger.error(
"The workflow instance is already been cached, this case shouldn't be happened");
"The workflow instance is already been cached, this case shouldn't be happened");
}
WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,
processService,
nettyExecutorManager,
processAlertManager,
masterConfig,
stateWheelExecuteThread,
curingGlobalParamsService);
processService,
processInstanceDao,
nettyExecutorManager,
processAlertManager,
masterConfig,
stateWheelExecuteThread,
curingGlobalParamsService);
processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
processInstance.getId()));
processInstance.getId()));
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
......@@ -232,25 +239,25 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
// make sure to finish handling command each time before next scan
latch.await();
logger.info(
"Master schedule bootstrap transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}",
commands.size(), processInstances.size());
"Master schedule bootstrap transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}",
commands.size(), processInstances.size());
ProcessInstanceMetrics
.recordProcessInstanceGenerateTime(System.currentTimeMillis() - commandTransformStartTime);
.recordProcessInstanceGenerateTime(System.currentTimeMillis() - commandTransformStartTime);
return processInstances;
}
private void sendRpcCommand(ProcessInstance processInstance) {
ProcessDefinition processDefinition = processInstance.getProcessDefinition();
if (processDefinition.getExecutionType().typeIsSerialPriority()){
if (processDefinition.getExecutionType().typeIsSerialPriority()) {
List<ProcessInstance> runningProcessInstances =
processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), new int[]{ExecutionStatus.READY_STOP.getCode()},
processInstance.getId());
processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), new int[] {ExecutionStatus.READY_STOP.getCode()},
processInstance.getId());
for (ProcessInstance runningProcessInstance : runningProcessInstances) {
StateEventChangeCommand workflowStateEventChangeCommand =
new StateEventChangeCommand(
runningProcessInstance.getId(), 0, runningProcessInstance.getState(), runningProcessInstance.getId(), 0);
new StateEventChangeCommand(
runningProcessInstance.getId(), 0, runningProcessInstance.getState(), runningProcessInstance.getId(), 0);
Host host = new Host(runningProcessInstance.getHost());
stateEventCallbackService.sendResult(host, workflowStateEventChangeCommand.convert2Command());
}
......@@ -269,11 +276,11 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
int pageNumber = 0;
int pageSize = masterConfig.getFetchCommandNum();
final List<Command> result =
processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
if (CollectionUtils.isNotEmpty(result)) {
logger.info(
"Master schedule bootstrap loop command success, command size: {}, current slot: {}, total slot size: {}",
result.size(), thisMasterSlot, masterCount);
"Master schedule bootstrap loop command success, command size: {}, current slot: {}, total slot size: {}",
result.size(), thisMasterSlot, masterCount);
}
ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime);
return result;
......
......@@ -57,6 +57,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
......@@ -121,6 +122,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private final ProcessService processService;
private ProcessInstanceDao processInstanceDao;
private final ProcessAlertManager processAlertManager;
private final NettyExecutorManager nettyExecutorManager;
......@@ -218,6 +221,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* @param processInstance processInstance
* @param processService processService
* @param processInstanceDao processInstanceDao
* @param nettyExecutorManager nettyExecutorManager
* @param processAlertManager processAlertManager
* @param masterConfig masterConfig
......@@ -226,12 +230,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
public WorkflowExecuteRunnable(
@NonNull ProcessInstance processInstance,
@NonNull ProcessService processService,
@NonNull ProcessInstanceDao processInstanceDao,
@NonNull NettyExecutorManager nettyExecutorManager,
@NonNull ProcessAlertManager processAlertManager,
@NonNull MasterConfig masterConfig,
@NonNull StateWheelExecuteThread stateWheelExecuteThread,
@NonNull CuringParamsService curingParamsService) {
this.processService = processService;
this.processInstanceDao = processInstanceDao;
this.processInstance = processInstance;
this.nettyExecutorManager = nettyExecutorManager;
this.processAlertManager = processAlertManager;
......@@ -371,7 +377,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// todo: merge the last taskInstance
processInstance.setVarPool(taskInstance.getVarPool());
processService.saveProcessInstance(processInstance);
processInstanceDao.upsertProcessInstance(processInstance);
if (!processInstance.isBlocked()) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
}
......@@ -889,7 +895,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
processInstance.getScheduleTime(),
cmdParam.get(Constants.SCHEDULE_TIMEZONE));
processInstance.setGlobalParams(globalParams);
processService.updateProcessInstance(processInstance);
processInstanceDao.updateProcessInstance(processInstance);
}
}
}
......@@ -1675,15 +1681,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
originStates,
newStates);
processInstance.setState(newStates);
processInstance.setStateWithDesc(newStates, "update by workflow executor");
if (newStates.typeIsFinished()) {
processInstance.setEndTime(new Date());
}
try {
processService.updateProcessInstance(processInstance);
processInstanceDao.updateProcessInstance(processInstance);
} catch (Exception ex) {
// recover the status
processInstance.setState(originStates);
processInstance.setStateWithDesc(originStates, "recover state by DB error");
processInstance.setEndTime(null);
throw new StateEventHandleException("Update process instance status to DB error", ex);
}
......
......@@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
......@@ -115,6 +116,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected ProcessService processService;
protected ProcessInstanceDao processInstanceDao;
protected MasterConfig masterConfig;
protected TaskPluginManager taskPluginManager;
......@@ -128,6 +131,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
@Override
public void init(@NonNull TaskInstance taskInstance, @NonNull ProcessInstance processInstance) {
processService = SpringApplicationContext.getBean(ProcessService.class);
processInstanceDao = SpringApplicationContext.getBean(ProcessInstanceDao.class);
masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
curingParamsService = SpringApplicationContext.getBean(CuringParamsService.class);
......
......@@ -189,7 +189,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
logger.info("blocking opportunity: expected-->{}, actual-->{}", expected, this.conditionResult);
processInstance.setBlocked(isBlocked);
if (isBlocked) {
processInstance.setState(ExecutionStatus.READY_BLOCK);
processInstance.setStateWithDesc(ExecutionStatus.READY_BLOCK, "ready block");
}
taskInstance.setState(status);
taskInstance.setEndTime(new Date());
......
......@@ -166,8 +166,8 @@ public class SubTaskProcessor extends BaseTaskProcessor {
if (subProcessInstance == null || taskInstance.getState().typeIsFinished()) {
return false;
}
subProcessInstance.setState(ExecutionStatus.READY_PAUSE);
processService.updateProcessInstance(subProcessInstance);
subProcessInstance.setStateWithDesc(ExecutionStatus.READY_PAUSE, "ready pause sub workflow");
processInstanceDao.updateProcessInstance(subProcessInstance);
sendToSubProcess();
return true;
}
......@@ -201,8 +201,8 @@ public class SubTaskProcessor extends BaseTaskProcessor {
if (subProcessInstance == null || taskInstance.getState().typeIsFinished()) {
return false;
}
subProcessInstance.setState(ExecutionStatus.READY_STOP);
processService.updateProcessInstance(subProcessInstance);
subProcessInstance.setStateWithDesc(ExecutionStatus.READY_STOP, "ready stop by kill task");
processInstanceDao.updateProcessInstance(subProcessInstance);
sendToSubProcess();
return true;
}
......
......@@ -27,6 +27,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.utils.DateUtils;
......@@ -79,6 +80,8 @@ public class WorkflowExecuteRunnableTest {
private ProcessService processService;
private ProcessInstanceDao processInstanceDao;
private final int processDefinitionId = 1;
private MasterConfig config;
......@@ -101,6 +104,8 @@ public class WorkflowExecuteRunnableTest {
processService = mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
processInstanceDao = mock(ProcessInstanceDao.class);
processInstance = mock(ProcessInstance.class);
Mockito.when(processInstance.getState()).thenReturn(ExecutionStatus.SUCCESS);
Mockito.when(processInstance.getHistoryCmd()).thenReturn(CommandType.COMPLEMENT_DATA.toString());
......@@ -120,7 +125,7 @@ public class WorkflowExecuteRunnableTest {
NettyExecutorManager nettyExecutorManager = mock(NettyExecutorManager.class);
ProcessAlertManager processAlertManager = mock(ProcessAlertManager.class);
workflowExecuteThread =
PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, nettyExecutorManager, processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService));
PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao, nettyExecutorManager, processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService));
// prepareProcess init dag
Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
dag.setAccessible(true);
......
......@@ -125,8 +125,6 @@ public interface ProcessService {
ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance);
void saveProcessInstance(ProcessInstance processInstance);
int saveCommand(Command command);
boolean saveTaskInstance(TaskInstance taskInstance);
......@@ -161,8 +159,6 @@ public interface ProcessService {
ProcessInstance findParentProcessInstance(Integer subProcessId);
int updateProcessInstance(ProcessInstance processInstance);
void changeOutParam(TaskInstance taskInstance);
Schedule querySchedule(int id);
......@@ -184,8 +180,6 @@ public interface ProcessService {
DataSource findDataSourceById(int id);
int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus);
ProcessInstance findProcessInstanceByTaskId(int taskId);
List<UdfFunc> queryUdfFunListByIds(Integer[] ids);
......
......@@ -101,6 +101,7 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.dao.utils.DqRuleUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
......@@ -175,9 +176,13 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private ProcessDefinitionLogMapper processDefineLogMapper;
// todo replace with processInstanceDao
@Autowired
private ProcessInstanceMapper processInstanceMapper;
@Autowired
private ProcessInstanceDao processInstanceDao;
@Autowired
private DataSourceMapper dataSourceMapper;
......@@ -299,7 +304,7 @@ public class ProcessServiceImpl implements ProcessService {
return null;
}
} else {
saveProcessInstance(processInstance);
processInstanceDao.upsertProcessInstance(processInstance);
}
setSubProcessParam(processInstance);
deleteCommandWithCheck(command.getId());
......@@ -307,23 +312,23 @@ public class ProcessServiceImpl implements ProcessService {
}
protected void saveSerialProcess(ProcessInstance processInstance, ProcessDefinition processDefinition) {
processInstance.setState(ExecutionStatus.SERIAL_WAIT);
saveProcessInstance(processInstance);
processInstance.setStateWithDesc(ExecutionStatus.SERIAL_WAIT, "wait by serial_wait strategy");
processInstanceDao.upsertProcessInstance(processInstance);
//serial wait
//when we get the running instance(or waiting instance) only get the priority instance(by id)
if (processDefinition.getExecutionType().typeIsSerialWait()) {
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
saveProcessInstance(processInstance);
processInstance.setStateWithDesc(ExecutionStatus.SUBMITTED_SUCCESS, "submit from serial_wait strategy");
processInstanceDao.upsertProcessInstance(processInstance);
}
} else if (processDefinition.getExecutionType().typeIsSerialDiscard()) {
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.STOP);
saveProcessInstance(processInstance);
processInstance.setStateWithDesc(ExecutionStatus.STOP, "stop by serial_discard strategy");
processInstanceDao.upsertProcessInstance(processInstance);
}
} else if (processDefinition.getExecutionType().typeIsSerialPriority()) {
List<ProcessInstance> runningProcessInstances =
......@@ -334,12 +339,12 @@ public class ProcessServiceImpl implements ProcessService {
for (ProcessInstance info : runningProcessInstances) {
info.setCommandType(CommandType.STOP);
info.addHistoryCmd(CommandType.STOP);
info.setState(ExecutionStatus.READY_STOP);
processService.updateProcessInstance(info);
info.setStateWithDesc(ExecutionStatus.READY_STOP, "ready stop by serial_priority strategy");
processInstanceDao.updateProcessInstance(info);
// TODO rpc after command handle complete, but this is not better
}
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
saveProcessInstance(processInstance);
processInstance.setStateWithDesc(ExecutionStatus.SUBMITTED_SUCCESS, "submit by serial_priority strategy");
processInstanceDao.upsertProcessInstance(processInstance);
}
}
......@@ -357,24 +362,6 @@ public class ProcessServiceImpl implements ProcessService {
this.commandMapper.deleteById(command.getId());
}
/**
* set process waiting thread
*
* @param command command
* @param processInstance processInstance
* @return process instance
*/
private ProcessInstance setWaitingThreadProcess(Command command, ProcessInstance processInstance) {
processInstance.setState(ExecutionStatus.WAITING_THREAD);
if (command.getCommandType() != CommandType.RECOVER_WAITING_THREAD) {
processInstance.addHistoryCmd(command.getCommandType());
}
saveProcessInstance(processInstance);
this.setSubProcessParam(processInstance);
createRecoveryWaitingThreadCommand(command, processInstance);
return null;
}
/**
* insert one command
*
......@@ -739,7 +726,7 @@ public class ProcessServiceImpl implements ProcessService {
ProcessInstance processInstance = new ProcessInstance(processDefinition);
processInstance.setProcessDefinitionCode(processDefinition.getCode());
processInstance.setProcessDefinitionVersion(processDefinition.getVersion());
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
processInstance.setStateWithDesc(ExecutionStatus.RUNNING_EXECUTION, "init running");
processInstance.setRecovery(Flag.NO);
processInstance.setStartTime(new Date());
// the new process instance restart time is null.
......@@ -1038,7 +1025,7 @@ public class ProcessServiceImpl implements ProcessService {
default:
break;
}
processInstance.setState(runStatus);
processInstance.setStateWithDesc(runStatus, commandType.getDescp());
return processInstance;
}
......@@ -1150,7 +1137,7 @@ public class ProcessServiceImpl implements ProcessService {
paramMap.put(CMD_PARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId()));
subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap));
subProcessInstance.setIsSubProcess(Flag.YES);
this.saveProcessInstance(subProcessInstance);
processInstanceDao.upsertProcessInstance(subProcessInstance);
}
// copy parent instance user def params to sub process..
String parentInstanceId = paramMap.get(CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
......@@ -1159,7 +1146,7 @@ public class ProcessServiceImpl implements ProcessService {
if (parentInstance != null) {
subProcessInstance.setGlobalParams(joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
subProcessInstance.setVarPool(joinVarPool(parentInstance.getVarPool(), subProcessInstance.getVarPool()));
this.saveProcessInstance(subProcessInstance);
processInstanceDao.upsertProcessInstance(subProcessInstance);
} else {
logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam);
}
......@@ -1492,8 +1479,8 @@ public class ProcessServiceImpl implements ProcessService {
*/
private void initSubInstanceState(ProcessInstance childInstance) {
if (childInstance != null) {
childInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
updateProcessInstance(childInstance);
childInstance.setStateWithDesc(ExecutionStatus.RUNNING_EXECUTION, "init sub workflow instance");
processInstanceDao.updateProcessInstance(childInstance);
}
}
......@@ -1624,24 +1611,6 @@ public class ProcessServiceImpl implements ProcessService {
return true;
}
/**
* insert or update work process instance to data base
*
* @param processInstance processInstance
*/
@Override
public void saveProcessInstance(ProcessInstance processInstance) {
if (processInstance == null) {
logger.error("save error, process instance is null!");
return;
}
if (processInstance.getId() != 0) {
processInstanceMapper.updateById(processInstance);
} else {
processInstanceMapper.insert(processInstance);
}
}
/**
* insert or update command
*
......@@ -1924,17 +1893,6 @@ public class ProcessServiceImpl implements ProcessService {
return processInstance;
}
/**
* update process instance
*
* @param processInstance processInstance
* @return update process instance result
*/
@Override
public int updateProcessInstance(ProcessInstance processInstance) {
return processInstanceMapper.updateById(processInstance);
}
/**
* for show in page of taskInstance
*/
......@@ -2104,20 +2062,6 @@ public class ProcessServiceImpl implements ProcessService {
return dataSourceMapper.selectById(id);
}
/**
* update process instance state by id
*
* @param processInstanceId processInstanceId
* @param executionStatus executionStatus
* @return update process result
*/
@Override
public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
ProcessInstance instance = processInstanceMapper.selectById(processInstanceId);
instance.setState(executionStatus);
return processInstanceMapper.updateById(instance);
}
/**
* find process instance by the task id
*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册