提交 9b698df7 编写于 作者: W WangJPLeo

cp [Fix-11465] RPC requests are placed after transaction commit

上级 553f781f
......@@ -26,7 +26,13 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
......@@ -94,6 +100,12 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread {
@Autowired
private WorkflowEventLooper workflowEventLooper;
@Autowired
private ProcessInstanceMapper processInstanceMapper;
@Autowired
private StateEventCallbackService stateEventCallbackService;
private String masterAddress;
protected MasterSchedulerBootstrap() {
......@@ -202,6 +214,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread {
if (processInstance != null) {
processInstances.add(processInstance);
logger.info("Master handle command {} end, create process instance {}", command.getId(), processInstance.getId());
sendRpcCommand(processInstance);
}
} catch (Exception e) {
logger.error("Master handle command {} error ", command.getId(), e);
......@@ -220,6 +233,24 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread {
return processInstances;
}
private void sendRpcCommand(ProcessInstance processInstance) {
ProcessDefinition processDefinition = processInstance.getProcessDefinition();
if (processDefinition.getExecutionType().typeIsSerialPriority()){
List<ProcessInstance> runningProcessInstances =
processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), new int[]{ExecutionStatus.READY_STOP.getCode()},
processInstance.getId());
for (ProcessInstance runningProcessInstance : runningProcessInstances) {
StateEventChangeCommand workflowStateEventChangeCommand =
new StateEventChangeCommand(
runningProcessInstance.getId(), 0, runningProcessInstance.getState(), runningProcessInstance.getId(), 0);
Host host = new Host(runningProcessInstance.getHost());
stateEventCallbackService.sendResult(host, workflowStateEventChangeCommand.convert2Command());
}
}
}
private List<Command> findCommands() throws MasterException {
try {
long scheduleStartTime = System.currentTimeMillis();
......
......@@ -340,19 +340,8 @@ public class ProcessServiceImpl implements ProcessService {
info.setCommandType(CommandType.STOP);
info.addHistoryCmd(CommandType.STOP);
info.setState(ExecutionStatus.READY_STOP);
int update = updateProcessInstance(info);
// determine whether the process is normal
if (update > 0) {
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
info.getId(), 0, info.getState(), info.getId(), 0
);
try {
Host host = new Host(info.getHost());
stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command());
} catch (Exception e) {
logger.error("sendResultError", e);
}
}
processService.updateProcessInstance(info);
// TODO rpc after command handle complete, but this is not better
}
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
saveProcessInstance(processInstance);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册