diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java index a611c52f553264656a6e8e6f5e9cbe9ce8ca0f76..d42a94c848d31ddc8962ed0f445cab084101f634 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java @@ -109,58 +109,44 @@ public class ProcessDao { /** - * find one command from command queue, construct process instance + * handle Command (construct ProcessInstance from Command) , wrapped in transaction * @param logger logger * @param host host * @param validThreadNum validThreadNum + * @param command found command * @return process instance */ @Transactional(rollbackFor = Exception.class) - public ProcessInstance scanCommand(Logger logger, String host, int validThreadNum){ - - ProcessInstance processInstance = null; - Command command = findOneCommand(); - if (command == null) { + public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) { + ProcessInstance processInstance = constructProcessInstance(command, host); + //cannot construct process instance, return null; + if(processInstance == null){ + logger.error("scan command, command parameter is error: %s", command.toString()); + moveToErrorCommand(command, "process instance is null"); return null; } - logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); - - try{ - processInstance = constructProcessInstance(command, host); - //cannot construct process instance, return null; - if(processInstance == null){ - logger.error("scan command, command parameter is error: %s", command.toString()); - delCommandByid(command.getId()); - saveErrorCommand(command, "process instance is null"); - return null; - } - if(!checkThreadNum(command, validThreadNum)){ - logger.info("there is not enough thread for this command: {}",command.toString() ); - return setWaitingThreadProcess(command, processInstance); - } - processInstance.setCommandType(command.getCommandType()); - processInstance.addHistoryCmd(command.getCommandType()); - saveProcessInstance(processInstance); - this.setSubProcessParam(processInstance); - delCommandByid(command.getId()); - return processInstance; - }catch (Exception e){ - logger.error("scan command error ", e); - saveErrorCommand(command, e.toString()); - delCommandByid(command.getId()); + if(!checkThreadNum(command, validThreadNum)){ + logger.info("there is not enough thread for this command: {}",command.toString() ); + return setWaitingThreadProcess(command, processInstance); } - return null; + processInstance.setCommandType(command.getCommandType()); + processInstance.addHistoryCmd(command.getCommandType()); + saveProcessInstance(processInstance); + this.setSubProcessParam(processInstance); + delCommandByid(command.getId()); + return processInstance; } /** - * save error command + * save error command, and delete original command * @param command command * @param message message */ - private void saveErrorCommand(Command command, String message) { - + @Transactional(rollbackFor = Exception.class) + public void moveToErrorCommand(Command command, String message) { ErrorCommand errorCommand = new ErrorCommand(command, message); this.errorCommandMapper.insert(errorCommand); + delCommandByid(command.getId()); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java index dc1c2fb75fcebcee795f9a1141d62f59cf79ad90..48ea415282e2bbeca290b7aa2d8cdceef334057b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.zk.AbstractZKClient; import org.apache.dolphinscheduler.dao.ProcessDao; +import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.commons.configuration.Configuration; @@ -108,10 +109,20 @@ public class MasterSchedulerThread implements Runnable { ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService; int activeCount = poolExecutor.getActiveCount(); // make sure to scan and delete command table in one transaction - processInstance = processDao.scanCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount); - if (processInstance != null) { - logger.info("start master exex thread , split DAG ..."); - masterExecService.execute(new MasterExecThread(processInstance,processDao)); + Command command = processDao.findOneCommand(); + if (command != null) { + logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); + + try{ + processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command); + if (processInstance != null) { + logger.info("start master exec thread , split DAG ..."); + masterExecService.execute(new MasterExecThread(processInstance,processDao)); + } + }catch (Exception e){ + logger.error("scan command error ", e); + processDao.moveToErrorCommand(command, e.toString()); + } } } }