ProcessService.java 70.6 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
17
package org.apache.dolphinscheduler.service.process;
Q
qiaozhanwei 已提交
18

L
ligang 已提交
19
import com.cronutils.model.Cron;
S
simon 已提交
20
import com.fasterxml.jackson.databind.node.ObjectNode;
21
import org.apache.commons.lang.ArrayUtils;
Q
qiaozhanwei 已提交
22 23 24 25 26 27
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
28
import org.apache.dolphinscheduler.common.utils.*;
Q
qiaozhanwei 已提交
29 30
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
31
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
L
ligang 已提交
32 33 34 35 36 37 38 39
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.*;
B
add esc  
baoliang 已提交
40
import java.util.stream.Collectors;
L
ligang 已提交
41

42
import static java.util.stream.Collectors.toSet;
Q
qiaozhanwei 已提交
43
import static org.apache.dolphinscheduler.common.Constants.*;
L
ligang 已提交
44 45 46 47 48

/**
 * process relative dao that some mappers in this.
 */
@Component
49
public class ProcessService {
L
ligang 已提交
50 51 52 53 54 55 56 57 58

    private final Logger logger = LoggerFactory.getLogger(getClass());

    private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
            ExecutionStatus.RUNNING_EXEUTION.ordinal(),
            ExecutionStatus.READY_PAUSE.ordinal(),
            ExecutionStatus.READY_STOP.ordinal()};

    @Autowired
59
    private UserMapper userMapper;
L
ligang 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87

    @Autowired
    private ProcessDefinitionMapper processDefineMapper;

    @Autowired
    private ProcessInstanceMapper processInstanceMapper;

    @Autowired
    private DataSourceMapper dataSourceMapper;

    @Autowired
    private ProcessInstanceMapMapper processInstanceMapMapper;

    @Autowired
    private TaskInstanceMapper taskInstanceMapper;

    @Autowired
    private CommandMapper commandMapper;

    @Autowired
    private ScheduleMapper scheduleMapper;

    @Autowired
    private UdfFuncMapper udfFuncMapper;

    @Autowired
    private ResourceMapper resourceMapper;

Q
qiaozhanwei 已提交
88

B
baoliang 已提交
89

B
baoliang 已提交
90 91 92
    @Autowired
    private ErrorCommandMapper errorCommandMapper;

93
    @Autowired
94
    private TenantMapper tenantMapper;
95

96
    @Autowired
97
    private  ProjectMapper projectMapper;
98

99
    /**
100
     * handle Command (construct ProcessInstance from Command) , wrapped in transaction
101 102 103
     * @param logger logger
     * @param host host
     * @param validThreadNum validThreadNum
104
     * @param command found command
105
     * @return process instance
106
     */
107
    @Transactional(rollbackFor = Exception.class)
108 109 110 111
    public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) {
        ProcessInstance processInstance = constructProcessInstance(command, host);
        //cannot construct process instance, return null;
        if(processInstance == null){
Q
qiaozhanwei 已提交
112
            logger.error("scan command, command parameter is error: {}", command);
113
            moveToErrorCommand(command, "process instance is null");
L
ligang 已提交
114 115
            return null;
        }
116
        if(!checkThreadNum(command, validThreadNum)){
Q
qiaozhanwei 已提交
117
            logger.info("there is not enough thread for this command: {}", command);
118
            return setWaitingThreadProcess(command, processInstance);
L
ligang 已提交
119
        }
120 121 122 123
        if (processInstance.getCommandType().equals(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS)){
            delCommandByid(command.getId());
            return null;
        }
124 125 126 127 128 129
        processInstance.setCommandType(command.getCommandType());
        processInstance.addHistoryCmd(command.getCommandType());
        saveProcessInstance(processInstance);
        this.setSubProcessParam(processInstance);
        delCommandByid(command.getId());
        return processInstance;
B
baoliang 已提交
130 131
    }

132
    /**
133
     * save error command, and delete original command
134 135 136
     * @param command command
     * @param message message
     */
137 138
    @Transactional(rollbackFor = Exception.class)
    public void moveToErrorCommand(Command command, String message) {
B
baoliang 已提交
139 140
        ErrorCommand errorCommand = new ErrorCommand(command, message);
        this.errorCommandMapper.insert(errorCommand);
141
        delCommandByid(command.getId());
B
baoliang 已提交
142 143
    }

B
baoliang 已提交
144 145
    /**
     * set process waiting thread
146 147 148
     * @param command command
     * @param processInstance processInstance
     * @return process instance
B
baoliang 已提交
149 150 151 152 153 154 155 156 157 158 159 160
     */
    private ProcessInstance setWaitingThreadProcess(Command command, ProcessInstance processInstance) {
        processInstance.setState(ExecutionStatus.WAITTING_THREAD);
        if(command.getCommandType() != CommandType.RECOVER_WAITTING_THREAD){
            processInstance.addHistoryCmd(command.getCommandType());
        }
        saveProcessInstance(processInstance);
        this.setSubProcessParam(processInstance);
        createRecoveryWaitingThreadCommand(command, processInstance);
        return null;
    }

161 162 163 164 165 166
    /**
     * check thread num
     * @param command command
     * @param validThreadNum validThreadNum
     * @return if thread is enough
     */
B
baoliang 已提交
167 168 169
    private boolean checkThreadNum(Command command, int validThreadNum) {
        int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId());
        return validThreadNum >= commandThreadCount;
L
ligang 已提交
170 171 172 173
    }

    /**
     * insert one command
174 175
     * @param command command
     * @return create result
L
ligang 已提交
176 177 178 179 180 181 182 183 184 185 186
     */
    public int createCommand(Command command) {
        int result = 0;
        if (command != null){
            result = commandMapper.insert(command);
        }
        return result;
    }

    /**
     * find one command from queue list
187
     * @return command
L
ligang 已提交
188 189
     */
    public Command findOneCommand(){
B
bao liang 已提交
190
        return commandMapper.getOneToRun();
L
ligang 已提交
191 192 193 194
    }

    /**
     * check the input command exists in queue list
195 196
     * @param command command
     * @return create command result
L
ligang 已提交
197 198 199 200 201 202 203 204 205 206
     */
    public Boolean verifyIsNeedCreateCommand(Command command){
        Boolean isNeedCreate = true;
        Map<CommandType,Integer> cmdTypeMap = new HashMap<CommandType,Integer>();
        cmdTypeMap.put(CommandType.REPEAT_RUNNING,1);
        cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS,1);
        cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS,1);
        CommandType commandType = command.getCommandType();

        if(cmdTypeMap.containsKey(commandType)){
S
simon 已提交
207 208
            ObjectNode cmdParamObj = JSONUtils.parseObject(command.getCommandParam());
            int processInstanceId = cmdParamObj.path(CMDPARAM_RECOVER_PROCESS_ID_STRING).asInt();
L
ligang 已提交
209

210
            List<Command> commands = commandMapper.selectList(null);
Q
qiaozhanwei 已提交
211
            // for all commands
L
ligang 已提交
212 213
            for (Command tmpCommand:commands){
                if(cmdTypeMap.containsKey(tmpCommand.getCommandType())){
214
                    ObjectNode tempObj = JSONUtils.parseObject(tmpCommand.getCommandParam());
S
simon 已提交
215
                    if(tempObj != null && processInstanceId == tempObj.path(CMDPARAM_RECOVER_PROCESS_ID_STRING).asInt()){
L
ligang 已提交
216 217 218 219 220 221 222 223 224 225 226
                        isNeedCreate = false;
                        break;
                    }
                }
            }
        }
        return  isNeedCreate;
    }

    /**
     * find process instance detail by id
227 228
     * @param processId processId
     * @return process instance
L
ligang 已提交
229 230 231 232 233
     */
    public ProcessInstance findProcessInstanceDetailById(int processId){
        return processInstanceMapper.queryDetailById(processId);
    }

_和's avatar
_和 已提交
234 235 236 237 238
    /**
     * get task node list by definitionId
     * @param defineId
     * @return
     */
239
    public  List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId){
_和's avatar
_和 已提交
240 241 242 243 244 245 246 247 248 249 250 251
        ProcessDefinition processDefinition = processDefineMapper.selectById(defineId);
        if (processDefinition == null) {
            logger.info("process define not exists");
            return null;
        }

        String processDefinitionJson = processDefinition.getProcessDefinitionJson();
        ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);

        //process data check
        if (null == processData) {
            logger.error("process data is null");
252
            return null;
_和's avatar
_和 已提交
253 254 255 256 257
        }

        return processData.getTasks();
    }

L
ligang 已提交
258 259
    /**
     * find process instance by id
260 261
     * @param processId processId
     * @return process instance
L
ligang 已提交
262 263
     */
    public ProcessInstance findProcessInstanceById(int processId){
B
bao liang 已提交
264
        return processInstanceMapper.selectById(processId);
L
ligang 已提交
265 266 267 268
    }

    /**
     * find process define by id.
269 270
     * @param processDefinitionId processDefinitionId
     * @return process definition
L
ligang 已提交
271 272
     */
    public ProcessDefinition findProcessDefineById(int processDefinitionId) {
B
bao liang 已提交
273
        return processDefineMapper.selectById(processDefinitionId);
L
ligang 已提交
274 275 276 277
    }

    /**
     * delete work process instance by id
278 279
     * @param processInstanceId processInstanceId
     * @return delete process instance result
L
ligang 已提交
280 281
     */
    public int deleteWorkProcessInstanceById(int processInstanceId){
B
bao liang 已提交
282
        return processInstanceMapper.deleteById(processInstanceId);
L
ligang 已提交
283 284 285 286
    }

    /**
     * delete all sub process by parent instance id
287 288
     * @param processInstanceId processInstanceId
     * @return delete all sub process instance result
L
ligang 已提交
289 290 291
     */
    public int deleteAllSubWorkProcessByParentId(int processInstanceId){

B
bao liang 已提交
292
        List<Integer> subProcessIdList = processInstanceMapMapper.querySubIdListByParentId(processInstanceId);
L
ligang 已提交
293

294
        for(Integer subId : subProcessIdList ){
L
ligang 已提交
295 296 297 298 299 300 301 302 303 304
            deleteAllSubWorkProcessByParentId(subId);
            deleteWorkProcessMapByParentId(subId);
            deleteWorkProcessInstanceById(subId);
        }
        return 1;
    }


    /**
     * calculate sub process number in the process define.
305 306
     * @param processDefinitionId processDefinitionId
     * @return process thread num count
L
ligang 已提交
307 308
     */
    private Integer workProcessThreadNumCount(Integer processDefinitionId){
B
bao liang 已提交
309
        List<Integer> ids = new ArrayList<>();
L
ligang 已提交
310 311 312 313 314 315
        recurseFindSubProcessId(processDefinitionId, ids);
        return ids.size()+1;
    }

    /**
     * recursive query sub process definition id by parent id.
316 317
     * @param parentId parentId
     * @param ids ids
L
ligang 已提交
318
     */
B
bao liang 已提交
319 320
    public void recurseFindSubProcessId(int parentId, List<Integer> ids){
        ProcessDefinition processDefinition = processDefineMapper.selectById(parentId);
L
ligang 已提交
321 322 323 324 325 326 327 328 329 330
        String processDefinitionJson = processDefinition.getProcessDefinitionJson();

        ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);

        List<TaskNode> taskNodeList = processData.getTasks();

        if (taskNodeList != null && taskNodeList.size() > 0){

            for (TaskNode taskNode : taskNodeList){
                String parameter = taskNode.getParams();
331
                if (parameter.contains(CMDPARAM_SUB_PROCESS_DEFINE_ID)){
张世鸣 已提交
332
                    SubProcessParameters subProcessParam = JSONUtils.parseObject(parameter, SubProcessParameters.class);
B
bao liang 已提交
333
                    ids.add(subProcessParam.getProcessDefinitionId());
L
ligang 已提交
334 335 336 337 338 339 340 341 342 343 344
                    recurseFindSubProcessId(subProcessParam.getProcessDefinitionId(),ids);
                }
            }
        }
    }

    /**
     * create recovery waiting thread command when thread pool is not enough for the process instance.
     * sub work process instance need not to create recovery command.
     * create recovery waiting thread  command and delete origin command at the same time.
     * if the recovery command is exists, only update the field update_time
345 346
     * @param originCommand originCommand
     * @param processInstance processInstance
L
ligang 已提交
347 348 349 350 351 352
     */
    public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {

        // sub process doesnot need to create wait command
        if(processInstance.getIsSubProcess() == Flag.YES){
            if(originCommand != null){
B
bao liang 已提交
353
                commandMapper.deleteById(originCommand.getId());
L
ligang 已提交
354 355 356 357 358 359 360 361 362 363 364 365 366
            }
            return;
        }
        Map<String, String> cmdParam = new HashMap<>();
        cmdParam.put(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD, String.valueOf(processInstance.getId()));
        // process instance quit by "waiting thread" state
        if(originCommand == null){
            Command command = new Command(
                    CommandType.RECOVER_WAITTING_THREAD,
                    processInstance.getTaskDependType(),
                    processInstance.getFailureStrategy(),
                    processInstance.getExecutorId(),
                    processInstance.getProcessDefinitionId(),
张世鸣 已提交
367
                    JSONUtils.toJsonString(cmdParam),
L
ligang 已提交
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
                    processInstance.getWarningType(),
                    processInstance.getWarningGroupId(),
                    processInstance.getScheduleTime(),
                    processInstance.getProcessInstancePriority()
            );
            saveCommand(command);
            return ;
        }

        // update the command time if current command if recover from waiting
        if(originCommand.getCommandType() == CommandType.RECOVER_WAITTING_THREAD){
            originCommand.setUpdateTime(new Date());
            saveCommand(originCommand);
        }else{
            // delete old command and create new waiting thread command
B
bao liang 已提交
383
            commandMapper.deleteById(originCommand.getId());
L
ligang 已提交
384 385 386
            originCommand.setId(0);
            originCommand.setCommandType(CommandType.RECOVER_WAITTING_THREAD);
            originCommand.setUpdateTime(new Date());
张世鸣 已提交
387
            originCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
L
ligang 已提交
388 389 390 391 392 393 394
            originCommand.setProcessInstancePriority(processInstance.getProcessInstancePriority());
            saveCommand(originCommand);
        }
    }

    /**
     * get schedule time from command
395 396 397
     * @param command command
     * @param cmdParam cmdParam map
     * @return date
L
ligang 已提交
398 399 400 401 402 403 404 405 406 407 408 409 410
     */
    private Date getScheduleTime(Command command, Map<String, String> cmdParam){
        Date scheduleTime = command.getScheduleTime();
        if(scheduleTime == null){
            if(cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)){
                scheduleTime = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
            }
        }
        return scheduleTime;
    }

    /**
     * generate a new work process instance from command.
411 412 413 414
     * @param processDefinition processDefinition
     * @param command command
     * @param cmdParam cmdParam map
     * @return process instance
L
ligang 已提交
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
     */
    private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
                                                       Command command,
                                                       Map<String, String> cmdParam){
        ProcessInstance processInstance = new ProcessInstance(processDefinition);
        processInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
        processInstance.setRecovery(Flag.NO);
        processInstance.setStartTime(new Date());
        processInstance.setRunTimes(1);
        processInstance.setMaxTryTimes(0);
        processInstance.setProcessDefinitionId(command.getProcessDefinitionId());
        processInstance.setCommandParam(command.getCommandParam());
        processInstance.setCommandType(command.getCommandType());
        processInstance.setIsSubProcess(Flag.NO);
        processInstance.setTaskDependType(command.getTaskDependType());
        processInstance.setFailureStrategy(command.getFailureStrategy());
        processInstance.setExecutorId(command.getExecutorId());
        WarningType warningType = command.getWarningType() == null ? WarningType.NONE : command.getWarningType();
        processInstance.setWarningType(warningType);
        Integer warningGroupId = command.getWarningGroupId() == null ? 0 : command.getWarningGroupId();
        processInstance.setWarningGroupId(warningGroupId);

        // schedule time
        Date scheduleTime = getScheduleTime(command, cmdParam);
        if(scheduleTime != null){
            processInstance.setScheduleTime(scheduleTime);
        }
        processInstance.setCommandStartTime(command.getStartTime());
        processInstance.setLocations(processDefinition.getLocations());
        processInstance.setConnects(processDefinition.getConnects());
        // curing global params
        processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
                processDefinition.getGlobalParamMap(),
                processDefinition.getGlobalParamList(),
                getCommandTypeIfComplement(processInstance, command),
                processInstance.getScheduleTime()));

        //copy process define json to process instance
        processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson());
        // set process instance priority
        processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
L
lgcareer 已提交
456 457
        String workerGroup = StringUtils.isBlank(command.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP : command.getWorkerGroup();
        processInstance.setWorkerGroup(workerGroup);
B
baoliang 已提交
458
        processInstance.setTimeout(processDefinition.getTimeout());
459
        processInstance.setTenantId(processDefinition.getTenantId());
L
ligang 已提交
460 461 462
        return processInstance;
    }

463 464 465 466 467
    /**
     * get process tenant
     * there is tenant id in definition, use the tenant of the definition.
     * if there is not tenant id in the definiton or the tenant not exist
     * use definition creator's tenant.
468 469 470
     * @param tenantId tenantId
     * @param userId userId
     * @return tenant
471 472 473 474 475 476
     */
    public Tenant getTenantForProcess(int tenantId, int userId){
        Tenant tenant = null;
        if(tenantId >= 0){
            tenant = tenantMapper.queryById(tenantId);
        }
477 478 479 480 481

        if (userId == 0){
            return null;
        }

482
        if(tenant == null){
B
bao liang 已提交
483
            User user = userMapper.selectById(userId);
484
            tenant = tenantMapper.queryById(user.getTenantId());
485 486 487
        }
        return tenant;
    }
L
ligang 已提交
488 489 490

    /**
     * check command parameters is valid
491 492 493
     * @param command command
     * @param cmdParam cmdParam map
     * @return whether command param is valid
L
ligang 已提交
494 495 496 497 498 499
     */
    private Boolean checkCmdParam(Command command, Map<String, String> cmdParam){
        if(command.getTaskDependType() == TaskDependType.TASK_ONLY || command.getTaskDependType()== TaskDependType.TASK_PRE){
            if(cmdParam == null
                    || !cmdParam.containsKey(Constants.CMDPARAM_START_NODE_NAMES)
                    || cmdParam.get(Constants.CMDPARAM_START_NODE_NAMES).isEmpty()){
500
                logger.error("command node depend type is {}, but start nodes is null ", command.getTaskDependType());
L
ligang 已提交
501 502 503 504 505 506 507 508
                return false;
            }
        }
        return true;
    }

    /**
     * construct process instance according to one command.
509 510 511
     * @param command command
     * @param host host
     * @return process instance
L
ligang 已提交
512 513 514 515 516 517 518 519 520
     */
    private ProcessInstance constructProcessInstance(Command command, String host){

        ProcessInstance processInstance = null;
        CommandType commandType = command.getCommandType();
        Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());

        ProcessDefinition processDefinition = null;
        if(command.getProcessDefinitionId() != 0){
B
bao liang 已提交
521
            processDefinition = processDefineMapper.selectById(command.getProcessDefinitionId());
L
ligang 已提交
522
            if(processDefinition == null){
523
                logger.error("cannot find the work process define! define id : {}", command.getProcessDefinitionId());
L
ligang 已提交
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551
                return null;
            }
        }

        if(cmdParam != null ){
            Integer processInstanceId = 0;
            // recover from failure or pause tasks
            if(cmdParam.containsKey(Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING)) {
                String processId = cmdParam.get(Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING);
                processInstanceId = Integer.parseInt(processId);
                if (processInstanceId == 0) {
                    logger.error("command parameter is error, [ ProcessInstanceId ] is 0");
                    return null;
                }
            }else if(cmdParam.containsKey(Constants.CMDPARAM_SUB_PROCESS)){
                // sub process map
                String pId = cmdParam.get(Constants.CMDPARAM_SUB_PROCESS);
                processInstanceId = Integer.parseInt(pId);
            }else if(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD)){
                // waiting thread command
                String pId = cmdParam.get(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD);
                processInstanceId = Integer.parseInt(pId);
            }
            if(processInstanceId ==0){
                processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
            }else{
                processInstance = this.findProcessInstanceDetailById(processInstanceId);
            }
B
bao liang 已提交
552
            processDefinition = processDefineMapper.selectById(processInstance.getProcessDefinitionId());
L
ligang 已提交
553 554 555 556 557
            processInstance.setProcessDefinition(processDefinition);

            //reset command parameter
            if(processInstance.getCommandParam() != null){
                Map<String, String> processCmdParam = JSONUtils.toMap(processInstance.getCommandParam());
558 559 560
                for(Map.Entry<String, String> entry: processCmdParam.entrySet()) {
                    if(!cmdParam.containsKey(entry.getKey())){
                        cmdParam.put(entry.getKey(), entry.getValue());
L
ligang 已提交
561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580
                    }
                }
            }
            // reset command parameter if sub process
            if(cmdParam.containsKey(Constants.CMDPARAM_SUB_PROCESS)){
                processInstance.setCommandParam(command.getCommandParam());
            }
        }else{
            // generate one new process instance
            processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
        }
        if(!checkCmdParam(command, cmdParam)){
            logger.error("command parameter check failed!");
            return null;
        }

        if(command.getScheduleTime() != null){
            processInstance.setScheduleTime(command.getScheduleTime());
        }
        processInstance.setHost(host);
581 582

        ExecutionStatus runStatus = ExecutionStatus.RUNNING_EXEUTION;
L
ligang 已提交
583 584 585 586 587 588 589
        int runTime = processInstance.getRunTimes();
        switch (commandType){
            case START_PROCESS:
                break;
            case START_FAILURE_TASK_PROCESS:
                // find failed tasks and init these tasks
                List<Integer> failedList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.FAILURE);
590
                List<Integer> toleranceList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.NEED_FAULT_TOLERANCE);
L
ligang 已提交
591 592 593 594
                List<Integer> killedList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.KILL);
                cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);

                failedList.addAll(killedList);
595
                failedList.addAll(toleranceList);
L
ligang 已提交
596 597 598 599 600
                for(Integer taskId : failedList){
                    initTaskInstance(this.findTaskInstanceById(taskId));
                }
                cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING,
                        String.join(Constants.COMMA, convertIntListToString(failedList)));
张世鸣 已提交
601
                processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
L
ligang 已提交
602 603 604 605 606 607 608 609 610 611
                processInstance.setRunTimes(runTime +1 );
                break;
            case START_CURRENT_TASK_PROCESS:
                break;
            case RECOVER_WAITTING_THREAD:
                break;
            case RECOVER_SUSPENDED_PROCESS:
                // find pause tasks and init task's state
                cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
                List<Integer> suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
leon-baoliang's avatar
leon-baoliang 已提交
612 613 614
                List<Integer> stopNodeList = findTaskIdByInstanceState(processInstance.getId(),
                        ExecutionStatus.KILL);
                suspendedNodeList.addAll(stopNodeList);
L
ligang 已提交
615
                for(Integer taskId : suspendedNodeList){
Q
qiaozhanwei 已提交
616
                    // initialize the pause state
L
ligang 已提交
617 618 619
                    initTaskInstance(this.findTaskInstanceById(taskId));
                }
                cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING, String.join(",", convertIntListToString(suspendedNodeList)));
张世鸣 已提交
620
                processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
L
ligang 已提交
621 622 623 624 625
                processInstance.setRunTimes(runTime +1);
                break;
            case RECOVER_TOLERANCE_FAULT_PROCESS:
                // recover tolerance fault process
                processInstance.setRecovery(Flag.YES);
626
                runStatus = processInstance.getState();
L
ligang 已提交
627 628 629 630 631 632 633 634 635 636 637 638 639
                break;
            case COMPLEMENT_DATA:
                // delete all the valid tasks when complement data
                List<TaskInstance> taskInstanceList = this.findValidTaskListByProcessId(processInstance.getId());
                for(TaskInstance taskInstance : taskInstanceList){
                    taskInstance.setFlag(Flag.NO);
                    this.updateTaskInstance(taskInstance);
                }
                break;
            case REPEAT_RUNNING:
                // delete the recover task names from command parameter
                if(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_START_NODE_STRING)){
                    cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
张世鸣 已提交
640
                    processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
L
ligang 已提交
641 642 643 644 645 646 647 648
                }
                // delete all the valid tasks when repeat running
                List<TaskInstance> validTaskList = findValidTaskListByProcessId(processInstance.getId());
                for(TaskInstance taskInstance : validTaskList){
                    taskInstance.setFlag(Flag.NO);
                    updateTaskInstance(taskInstance);
                }
                processInstance.setStartTime(new Date());
649
                processInstance.setEndTime(null);
L
ligang 已提交
650 651 652 653 654 655 656 657
                processInstance.setRunTimes(runTime +1);
                initComplementDataParam(processDefinition, processInstance, cmdParam);
                break;
            case SCHEDULER:
                break;
            default:
                break;
        }
658
        processInstance.setState(runStatus);
L
ligang 已提交
659 660 661 662 663
        return processInstance;
    }

    /**
     * return complement data if the process start with complement data
664 665 666
     * @param processInstance processInstance
     * @param command command
     * @return command type
L
ligang 已提交
667 668 669 670 671 672 673 674 675 676 677
     */
    private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command){
        if(CommandType.COMPLEMENT_DATA == processInstance.getCmdTypeIfComplement()){
            return CommandType.COMPLEMENT_DATA;
        }else{
            return command.getCommandType();
        }
    }

    /**
     * initialize complement data parameters
678 679 680
     * @param processDefinition processDefinition
     * @param processInstance processInstance
     * @param cmdParam cmdParam
L
ligang 已提交
681
     */
682 683 684
    private void initComplementDataParam(ProcessDefinition processDefinition,
                                         ProcessInstance processInstance,
                                         Map<String, String> cmdParam) {
L
ligang 已提交
685 686 687 688 689 690
        if(!processInstance.isComplementData()){
            return;
        }

        Date startComplementTime = DateUtils.parse(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE),
                YYYY_MM_DD_HH_MM_SS);
691
        processInstance.setScheduleTime(startComplementTime);
L
ligang 已提交
692 693 694 695 696 697 698
        processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
                processDefinition.getGlobalParamMap(),
                processDefinition.getGlobalParamList(),
                CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));

    }

699

L
ligang 已提交
700 701 702
    /**
     * set sub work process parameters.
     * handle sub work process instance, update relation table and command parameters
703 704 705
     * set sub work process flag, extends parent work process command parameters
     * @param subProcessInstance subProcessInstance
     * @return process instance
L
ligang 已提交
706
     */
707 708
    public ProcessInstance setSubProcessParam(ProcessInstance subProcessInstance){
        String cmdParam = subProcessInstance.getCommandParam();
L
ligang 已提交
709
        if(StringUtils.isEmpty(cmdParam)){
710
            return subProcessInstance;
L
ligang 已提交
711 712 713 714 715 716
        }
        Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
        // write sub process id into cmd param.
        if(paramMap.containsKey(CMDPARAM_SUB_PROCESS)
                && CMDPARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMDPARAM_SUB_PROCESS))){
            paramMap.remove(CMDPARAM_SUB_PROCESS);
717
            paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId()));
张世鸣 已提交
718
            subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap));
719 720
            subProcessInstance.setIsSubProcess(Flag.YES);
            this.saveProcessInstance(subProcessInstance);
L
ligang 已提交
721 722 723 724 725 726
        }
        // copy parent instance user def params to sub process..
        String parentInstanceId = paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
        if(StringUtils.isNotEmpty(parentInstanceId)){
            ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
            if(parentInstance != null){
727 728 729
                subProcessInstance.setGlobalParams(
                        joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
                this.saveProcessInstance(subProcessInstance);
L
ligang 已提交
730 731 732 733 734 735
            }else{
                logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam);
            }
        }
        ProcessInstanceMap processInstanceMap = JSONUtils.parseObject(cmdParam, ProcessInstanceMap.class);
        if(processInstanceMap == null || processInstanceMap.getParentProcessInstanceId() == 0){
736
            return subProcessInstance;
L
ligang 已提交
737 738
        }
        // update sub process id to process map table
739
        processInstanceMap.setProcessInstanceId(subProcessInstance.getId());
L
ligang 已提交
740 741

        this.updateWorkProcessInstanceMap(processInstanceMap);
742 743 744 745 746
        return subProcessInstance;
    }

    /**
     * join parent global params into sub process.
747 748 749 750
     * only the keys doesn't in sub process global would be joined.
     * @param parentGlobalParams parentGlobalParams
     * @param subGlobalParams subGlobalParams
     * @return global params join
751 752
     */
    private String joinGlobalParams(String parentGlobalParams, String subGlobalParams){
S
simon824 已提交
753 754 755

        List<Property> parentPropertyList = JSONUtils.toList(parentGlobalParams, Property.class);
        List<Property> subPropertyList = JSONUtils.toList(subGlobalParams, Property.class);
S
simon824 已提交
756

B
add esc  
baoliang 已提交
757
        Map<String,String> subMap = subPropertyList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
758

B
add esc  
baoliang 已提交
759 760 761
        for(Property parent : parentPropertyList){
            if(!subMap.containsKey(parent.getProp())){
                subPropertyList.add(parent);
762 763
            }
        }
张世鸣 已提交
764
        return JSONUtils.toJsonString(subPropertyList);
L
ligang 已提交
765 766 767 768
    }

    /**
     * initialize task instance
769
     * @param taskInstance taskInstance
L
ligang 已提交
770 771
     */
    private void initTaskInstance(TaskInstance taskInstance){
leon-baoliang's avatar
leon-baoliang 已提交
772 773 774 775 776 777 778

        if(!taskInstance.isSubProcess()){
            if(taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure()){
                taskInstance.setFlag(Flag.NO);
                updateTaskInstance(taskInstance);
                return;
            }
L
ligang 已提交
779
        }
leon-baoliang's avatar
leon-baoliang 已提交
780 781
        taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
        updateTaskInstance(taskInstance);
L
ligang 已提交
782 783 784
    }

    /**
785
     * submit task to db
786 787 788
     * submit sub process to command
     * @param taskInstance taskInstance
     * @return task instance
L
ligang 已提交
789
     */
790
    @Transactional(rollbackFor = Exception.class)
791 792 793 794
    public TaskInstance submitTask(TaskInstance taskInstance){
        ProcessInstance processInstance = this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
        logger.info("start submit task : {}, instance id:{}, state: {}",
                taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
795 796 797 798 799 800 801 802 803 804 805 806
        //submit to db
        TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
        if(task == null){
            logger.error("end submit task to db error, task name:{}, process id:{} state: {} ",
                    taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
            return task;
        }
        if(!task.getState().typeIsFinished()){
            createSubWorkProcessCommand(processInstance, task);
        }

        logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {}  ",
L
ligang 已提交
807 808 809 810 811 812
                taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
        return task;
    }

    /**
     * set work process instance map
813 814 815
     * @param parentInstance parentInstance
     * @param parentTask parentTask
     * @return process instance map
L
ligang 已提交
816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841
     */
    private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask){
        ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId());
        if(processMap != null){
            return processMap;
        }else if(parentInstance.getCommandType() == CommandType.REPEAT_RUNNING
                || parentInstance.isComplementData()){
            // update current task id to map
            // repeat running  does not generate new sub process instance
            processMap = findPreviousTaskProcessMap(parentInstance, parentTask);
            if(processMap!= null){
                processMap.setParentTaskInstanceId(parentTask.getId());
                updateWorkProcessInstanceMap(processMap);
                return processMap;
            }
        }
        // new task
        processMap = new ProcessInstanceMap();
        processMap.setParentProcessInstanceId(parentInstance.getId());
        processMap.setParentTaskInstanceId(parentTask.getId());
        createWorkProcessInstanceMap(processMap);
        return processMap;
    }

    /**
     * find previous task work process map.
842 843 844
     * @param parentProcessInstance parentProcessInstance
     * @param parentTask parentTask
     * @return process instance map
L
ligang 已提交
845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866
     */
    private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance,
                                                          TaskInstance parentTask) {

        Integer preTaskId = 0;
        List<TaskInstance> preTaskList = this.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId());
        for(TaskInstance task : preTaskList){
            if(task.getName().equals(parentTask.getName())){
                preTaskId = task.getId();
                ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId);
                if(map!=null){
                    return map;
                }
            }
        }
        logger.info("sub process instance is not found,parent task:{},parent instance:{}",
                parentTask.getId(), parentProcessInstance.getId());
        return null;
    }

    /**
     * create sub work process command
867 868
     * @param parentProcessInstance parentProcessInstance
     * @param task task
L
ligang 已提交
869 870
     */
    private void createSubWorkProcessCommand(ProcessInstance parentProcessInstance,
871 872 873 874 875 876 877 878 879
                                             TaskInstance task){
        if(!task.isSubProcess()){
            return;
        }
        ProcessInstanceMap instanceMap = setProcessInstanceMap(parentProcessInstance, task);
        TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
        Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
        Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));

L
ligang 已提交
880 881 882 883
        ProcessInstance childInstance = findSubProcessInstance(parentProcessInstance.getId(), task.getId());

        CommandType fatherType = parentProcessInstance.getCommandType();
        CommandType commandType = fatherType;
884
        if(childInstance == null || commandType == CommandType.REPEAT_RUNNING){
L
ligang 已提交
885 886 887 888 889 890 891 892 893 894 895 896 897 898
            String fatherHistoryCommand = parentProcessInstance.getHistoryCmd();
            // sub process must begin with schedule/complement data
            // if father begin with scheduler/complement data
            if(fatherHistoryCommand.startsWith(CommandType.SCHEDULER.toString()) ||
                    fatherHistoryCommand.startsWith(CommandType.COMPLEMENT_DATA.toString())){
                commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
            }
        }

        if(childInstance != null){
            childInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
            updateProcessInstance(childInstance);
        }
        // set sub work process command
张世鸣 已提交
899
        String processMapStr = JSONUtils.toJsonString(instanceMap);
L
ligang 已提交
900 901 902 903 904 905 906 907 908
        Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);

        if(commandType == CommandType.COMPLEMENT_DATA ||
                (childInstance != null && childInstance.isComplementData())){
            Map<String, String> parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam());
            String endTime =  parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
            String startTime =  parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime);
            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
张世鸣 已提交
909
            processMapStr = JSONUtils.toJsonString(cmdParam);
L
ligang 已提交
910
        }
911 912 913

        updateSubProcessDefinitionByParent(parentProcessInstance, childDefineId);

L
ligang 已提交
914 915 916 917 918 919 920 921 922 923 924 925 926 927
        Command command = new Command();
        command.setWarningType(parentProcessInstance.getWarningType());
        command.setWarningGroupId(parentProcessInstance.getWarningGroupId());
        command.setFailureStrategy(parentProcessInstance.getFailureStrategy());
        command.setProcessDefinitionId(childDefineId);
        command.setScheduleTime(parentProcessInstance.getScheduleTime());
        command.setExecutorId(parentProcessInstance.getExecutorId());
        command.setCommandParam(processMapStr);
        command.setCommandType(commandType);
        command.setProcessInstancePriority(parentProcessInstance.getProcessInstancePriority());
        createCommand(command);
        logger.info("sub process command created: {} ", command.toString());
    }

928 929 930 931 932
    /**
     * update sub process definition
     * @param parentProcessInstance parentProcessInstance
     * @param childDefinitionId childDefinitionId
     */
933 934 935 936 937 938
    private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, int childDefinitionId) {
        ProcessDefinition fatherDefinition = this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId());
        ProcessDefinition childDefinition = this.findProcessDefineById(childDefinitionId);
        if(childDefinition != null && fatherDefinition != null){
            childDefinition.setReceivers(fatherDefinition.getReceivers());
            childDefinition.setReceiversCc(fatherDefinition.getReceiversCc());
B
bao liang 已提交
939
            processDefineMapper.updateById(childDefinition);
940 941 942
        }
    }

L
ligang 已提交
943 944
    /**
     * submit task to mysql
945 946 947
     * @param taskInstance taskInstance
     * @param processInstance processInstance
     * @return task instance
L
ligang 已提交
948
     */
949
    public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance){
L
ligang 已提交
950 951 952 953 954 955 956 957 958 959 960 961 962
        ExecutionStatus processInstanceState = processInstance.getState();

        if(taskInstance.getState().typeIsFailure()){
            if(taskInstance.isSubProcess()){
                taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1 );
            }else {

                if( processInstanceState != ExecutionStatus.READY_STOP
                        && processInstanceState != ExecutionStatus.READY_PAUSE){
                    // failure task set invalid
                    taskInstance.setFlag(Flag.NO);
                    updateTaskInstance(taskInstance);
                    // crate new task instance
963 964 965 966 967
                    if(taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE){
                        taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1 );
                    }
                    taskInstance.setEndTime(null);
                    taskInstance.setStartTime(new Date());
L
ligang 已提交
968 969 970 971 972 973
                    taskInstance.setFlag(Flag.YES);
                    taskInstance.setHost(null);
                    taskInstance.setId(0);
                }
            }
        }
974
        taskInstance.setExecutorId(processInstance.getExecutorId());
L
ligang 已提交
975 976 977
        taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
        taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState));
        taskInstance.setSubmitTime(new Date());
978 979 980 981
        boolean saveResult = saveTaskInstance(taskInstance);
        if(!saveResult){
            return null;
        }
L
ligang 已提交
982 983 984 985 986
        return taskInstance;
    }


    /**
987
     * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskInstanceId}_${task executed by ip1},${ip2}...
L
ligang 已提交
988
     * The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low.
989 990
     * @param taskInstance taskInstance
     * @return task zk queue str
L
ligang 已提交
991
     */
992
    public String taskZkInfo(TaskInstance taskInstance) {
993

L
lgcareer 已提交
994
        String taskWorkerGroup = getTaskWorkerGroup(taskInstance);
995 996 997 998 999
        ProcessInstance processInstance = this.findProcessInstanceById(taskInstance.getProcessInstanceId());
        if(processInstance == null){
            logger.error("process instance is null. please check the task info, task id: " + taskInstance.getId());
            return "";
        }
1000 1001 1002

        StringBuilder sb = new StringBuilder(100);

1003
        sb.append(processInstance.getProcessInstancePriority().ordinal()).append(Constants.UNDERLINE)
1004 1005
                .append(taskInstance.getProcessInstanceId()).append(Constants.UNDERLINE)
                .append(taskInstance.getTaskInstancePriority().ordinal()).append(Constants.UNDERLINE)
L
lgcareer 已提交
1006 1007
                .append(taskInstance.getId()).append(Constants.UNDERLINE)
                .append(taskInstance.getWorkerGroup());
1008 1009

        return  sb.toString();
L
ligang 已提交
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019
    }

    /**
     * get submit task instance state by the work process state
     * cannot modify the task state when running/kill/submit success, or this
     * task instance is already exists in task queue .
     * return pause if work process state is ready pause
     * return stop if work process state is ready stop
     * if all of above are not satisfied, return submit success
     *
1020 1021 1022
     * @param taskInstance taskInstance
     * @param processInstanceState processInstanceState
     * @return process instance state
L
ligang 已提交
1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
     */
    public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState){
        ExecutionStatus state = taskInstance.getState();
        if(
                // running or killed
                // the task already exists in task queue
                // return state
                state == ExecutionStatus.RUNNING_EXEUTION
                        || state == ExecutionStatus.KILL
                        || checkTaskExistsInTaskQueue(taskInstance)
                ){
            return state;
        }
        //return pasue /stop if process instance state is ready pause / stop
        // or return submit success
        if( processInstanceState == ExecutionStatus.READY_PAUSE){
            state = ExecutionStatus.PAUSE;
1040 1041
        }else if(processInstanceState == ExecutionStatus.READY_STOP
                || !checkProcessStrategy(taskInstance)) {
L
ligang 已提交
1042 1043 1044 1045 1046 1047 1048
            state = ExecutionStatus.KILL;
        }else{
            state = ExecutionStatus.SUBMITTED_SUCCESS;
        }
        return state;
    }

1049 1050 1051 1052 1053
    /**
     *  check process instance strategy
     * @param taskInstance taskInstance
     * @return check strategy result
     */
1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069
    private boolean checkProcessStrategy(TaskInstance taskInstance){
        ProcessInstance processInstance = this.findProcessInstanceById(taskInstance.getProcessInstanceId());
        FailureStrategy failureStrategy = processInstance.getFailureStrategy();
        if(failureStrategy == FailureStrategy.CONTINUE){
            return true;
        }
        List<TaskInstance> taskInstances = this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());

        for(TaskInstance task : taskInstances){
            if(task.getState() == ExecutionStatus.FAILURE){
                return false;
            }
        }
        return true;
    }

L
ligang 已提交
1070 1071
    /**
     * check the task instance existing in queue
1072 1073
     * @param taskInstance taskInstance
     * @return whether taskinstance exists queue
L
ligang 已提交
1074
     */
1075 1076
    public boolean checkTaskExistsInTaskQueue(TaskInstance taskInstance){
        if(taskInstance.isSubProcess()){
L
ligang 已提交
1077 1078 1079
            return false;
        }

1080
        String taskZkInfo = taskZkInfo(taskInstance);
L
ligang 已提交
1081

Q
qiaozhanwei 已提交
1082
        return false;
L
ligang 已提交
1083 1084 1085 1086
    }

    /**
     * create a new process instance
1087
     * @param processInstance processInstance
L
ligang 已提交
1088 1089 1090 1091 1092 1093 1094 1095 1096 1097
     */
    public void createProcessInstance(ProcessInstance processInstance){

        if (processInstance != null){
            processInstanceMapper.insert(processInstance);
        }
    }

    /**
     * insert or update work process instance to data base
1098
     * @param processInstance processInstance
L
ligang 已提交
1099
     */
1100
    public void saveProcessInstance(ProcessInstance processInstance){
L
ligang 已提交
1101

1102
        if (processInstance == null){
L
ligang 已提交
1103 1104 1105
            logger.error("save error, process instance is null!");
            return ;
        }
1106 1107
        if(processInstance.getId() != 0){
            processInstanceMapper.updateById(processInstance);
L
ligang 已提交
1108
        }else{
1109
            createProcessInstance(processInstance);
L
ligang 已提交
1110 1111 1112 1113 1114
        }
    }

    /**
     * insert or update command
1115 1116
     * @param command command
     * @return save command result
L
ligang 已提交
1117 1118 1119
     */
    public int saveCommand(Command command){
        if(command.getId() != 0){
B
bao liang 已提交
1120
            return commandMapper.updateById(command);
L
ligang 已提交
1121 1122 1123 1124 1125 1126 1127
        }else{
            return commandMapper.insert(command);
        }
    }

    /**
     *  insert or update task instance
1128 1129
     * @param taskInstance taskInstance
     * @return save task instance result
L
ligang 已提交
1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140
     */
    public boolean saveTaskInstance(TaskInstance taskInstance){
        if(taskInstance.getId() != 0){
            return updateTaskInstance(taskInstance);
        }else{
            return createTaskInstance(taskInstance);
        }
    }

    /**
     * insert task instance
1141 1142
     * @param taskInstance taskInstance
     * @return create task instance result
L
ligang 已提交
1143 1144 1145 1146 1147 1148 1149 1150
     */
    public boolean createTaskInstance(TaskInstance taskInstance) {
        int count = taskInstanceMapper.insert(taskInstance);
        return count > 0;
    }

    /**
     * update task instance
1151 1152
     * @param taskInstance taskInstance
     * @return update task instance result
L
ligang 已提交
1153 1154
     */
    public boolean updateTaskInstance(TaskInstance taskInstance){
B
bao liang 已提交
1155
        int count = taskInstanceMapper.updateById(taskInstance);
L
ligang 已提交
1156 1157 1158 1159
        return count > 0;
    }
    /**
     * delete a command by id
1160
     * @param id  id
L
ligang 已提交
1161 1162
     */
    public void delCommandByid(int id) {
B
bao liang 已提交
1163
        commandMapper.deleteById(id);
L
ligang 已提交
1164 1165
    }

1166 1167 1168 1169 1170
    /**
     * find task instance by id
     * @param taskId task id
     * @return task intance
     */
L
ligang 已提交
1171
    public TaskInstance findTaskInstanceById(Integer taskId){
B
bao liang 已提交
1172
        return taskInstanceMapper.selectById(taskId);
L
ligang 已提交
1173 1174
    }

journey2018's avatar
journey2018 已提交
1175 1176 1177

    /**
     * package task instance,associate processInstance and processDefine
1178 1179
     * @param taskInstId taskInstId
     * @return task instance
journey2018's avatar
journey2018 已提交
1180
     */
1181
    public TaskInstance getTaskInstanceDetailByTaskId(int taskInstId){
journey2018's avatar
journey2018 已提交
1182 1183
        // get task instance
        TaskInstance taskInstance = findTaskInstanceById(taskInstId);
1184 1185 1186
        if(taskInstance == null){
            return taskInstance;
        }
journey2018's avatar
journey2018 已提交
1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197
        // get process instance
        ProcessInstance processInstance = findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
        // get process define
        ProcessDefinition processDefine = findProcessDefineById(taskInstance.getProcessDefinitionId());

        taskInstance.setProcessInstance(processInstance);
        taskInstance.setProcessDefine(processDefine);
        return taskInstance;
    }


L
ligang 已提交
1198 1199
    /**
     * get id list by task state
1200 1201 1202
     * @param instanceId instanceId
     * @param state state
     * @return task instance states
L
ligang 已提交
1203 1204 1205 1206 1207 1208 1209
     */
    public List<Integer> findTaskIdByInstanceState(int instanceId, ExecutionStatus state){
        return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.ordinal());
    }

    /**
     * find valid task list by process definition id
1210 1211
     * @param processInstanceId processInstanceId
     * @return task instance list
L
ligang 已提交
1212 1213 1214 1215 1216 1217 1218
     */
    public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId){
         return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES);
    }

    /**
     * find previous task list by work process id
1219 1220
     * @param processInstanceId processInstanceId
     * @return task instance list
L
ligang 已提交
1221
     */
1222 1223
    public List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId){
        return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO);
L
ligang 已提交
1224 1225 1226 1227
    }

    /**
     * update work process instance map
1228 1229
     * @param processInstanceMap processInstanceMap
     * @return update process instance result
L
ligang 已提交
1230 1231
     */
    public int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap){
B
bao liang 已提交
1232
        return processInstanceMapMapper.updateById(processInstanceMap);
L
ligang 已提交
1233 1234 1235 1236 1237
    }


    /**
     * create work process instance map
1238 1239
     * @param processInstanceMap processInstanceMap
     * @return create process instance result
L
ligang 已提交
1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250
     */
    public int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap){
        Integer count = 0;
        if(processInstanceMap !=null){
            return  processInstanceMapMapper.insert(processInstanceMap);
        }
        return count;
    }

    /**
     * find work process map by parent process id and parent task id.
1251 1252 1253
     * @param parentWorkProcessId parentWorkProcessId
     * @param parentTaskId parentTaskId
     * @return process instance map
L
ligang 已提交
1254 1255 1256 1257 1258 1259 1260
     */
    public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId){
        return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId);
    }

    /**
     * delete work process map by parent process id
1261 1262
     * @param parentWorkProcessId parentWorkProcessId
     * @return delete process map result
L
ligang 已提交
1263 1264 1265 1266 1267 1268
     */
    public int deleteWorkProcessMapByParentId(int parentWorkProcessId){
        return processInstanceMapMapper.deleteByParentProcessId(parentWorkProcessId);

    }

1269 1270 1271 1272 1273 1274
    /**
     * find sub process instance
     * @param parentProcessId parentProcessId
     * @param parentTaskId parentTaskId
     * @return process instance
     */
L
ligang 已提交
1275 1276 1277 1278 1279 1280 1281 1282 1283
    public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId){
        ProcessInstance processInstance = null;
        ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryByParentId(parentProcessId, parentTaskId);
        if(processInstanceMap == null || processInstanceMap.getProcessInstanceId() == 0){
            return processInstance;
        }
        processInstance = findProcessInstanceById(processInstanceMap.getProcessInstanceId());
        return processInstance;
    }
1284 1285 1286 1287 1288 1289

    /**
     * find parent process instance
     * @param subProcessId subProcessId
     * @return process instance
     */
L
ligang 已提交
1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302
    public ProcessInstance findParentProcessInstance(Integer subProcessId) {
        ProcessInstance processInstance = null;
        ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryBySubProcessId(subProcessId);
        if(processInstanceMap == null || processInstanceMap.getProcessInstanceId() == 0){
            return processInstance;
        }
        processInstance = findProcessInstanceById(processInstanceMap.getParentProcessInstanceId());
        return processInstance;
    }


    /**
     * change task state
1303 1304 1305 1306 1307 1308
     * @param state state
     * @param startTime startTime
     * @param host host
     * @param executePath executePath
     * @param logPath logPath
     * @param taskInstId taskInstId
L
ligang 已提交
1309 1310 1311 1312 1313
     */
    public void changeTaskState(ExecutionStatus state, Date startTime, String host,
                                String executePath,
                                String logPath,
                                int taskInstId) {
B
bao liang 已提交
1314
        TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId);
L
ligang 已提交
1315 1316 1317 1318 1319 1320 1321 1322 1323 1324
        taskInstance.setState(state);
        taskInstance.setStartTime(startTime);
        taskInstance.setHost(host);
        taskInstance.setExecutePath(executePath);
        taskInstance.setLogPath(logPath);
        saveTaskInstance(taskInstance);
    }

    /**
     * update process instance
1325 1326
     * @param processInstance processInstance
     * @return update process instance result
L
ligang 已提交
1327
     */
1328 1329
    public int updateProcessInstance(ProcessInstance processInstance){
        return processInstanceMapper.updateById(processInstance);
L
ligang 已提交
1330 1331 1332 1333
    }

    /**
     * update the process instance
1334 1335 1336 1337 1338 1339 1340 1341
     * @param processInstanceId processInstanceId
     * @param processJson processJson
     * @param globalParams globalParams
     * @param scheduleTime scheduleTime
     * @param flag flag
     * @param locations locations
     * @param connects connects
     * @return update process instance result
L
ligang 已提交
1342 1343 1344 1345
     */
    public int updateProcessInstance(Integer processInstanceId, String processJson,
                                     String globalParams, Date scheduleTime, Flag flag,
                                     String locations, String connects){
B
bao liang 已提交
1346 1347 1348 1349 1350 1351 1352 1353 1354 1355
        ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId);
        if(processInstance!= null){
            processInstance.setProcessInstanceJson(processJson);
            processInstance.setGlobalParams(globalParams);
            processInstance.setScheduleTime(scheduleTime);
            processInstance.setLocations(locations);
            processInstance.setConnects(connects);
            return processInstanceMapper.updateById(processInstance);
        }
        return 0;
L
ligang 已提交
1356 1357 1358 1359
    }

    /**
     * change task state
1360 1361 1362
     * @param state state
     * @param endTime endTime
     * @param taskInstId taskInstId
L
ligang 已提交
1363 1364 1365
     */
    public void changeTaskState(ExecutionStatus state,
                                Date endTime,
Q
qiaozhanwei 已提交
1366 1367
                                int processId,
                                String appIds,
L
ligang 已提交
1368
                                int taskInstId) {
B
bao liang 已提交
1369
        TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId);
Q
qiaozhanwei 已提交
1370 1371
        taskInstance.setPid(processId);
        taskInstance.setAppLink(appIds);
L
ligang 已提交
1372 1373 1374 1375 1376 1377 1378
        taskInstance.setState(state);
        taskInstance.setEndTime(endTime);
        saveTaskInstance(taskInstance);
    }

    /**
     * convert integer list to string list
1379 1380
     * @param intList intList
     * @return string list
L
ligang 已提交
1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394
     */
    public List<String> convertIntListToString(List<Integer> intList){
        if(intList == null){
            return new ArrayList<>();
        }
        List<String> result = new ArrayList<String>(intList.size());
        for(Integer intVar : intList){
            result.add(String.valueOf(intVar));
        }
        return result;
    }

    /**
     * update pid and app links field by task instance id
1395 1396 1397
     * @param taskInstId taskInstId
     * @param pid pid
     * @param appLinks appLinks
L
ligang 已提交
1398 1399 1400
     */
    public void updatePidByTaskInstId(int taskInstId, int pid,String appLinks) {

B
bao liang 已提交
1401
        TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId);
L
ligang 已提交
1402 1403 1404 1405 1406 1407
        taskInstance.setPid(pid);
        taskInstance.setAppLink(appLinks);
        saveTaskInstance(taskInstance);
    }

    /**
1408 1409 1410
     * query schedule by id
     * @param id id
     * @return schedule
L
ligang 已提交
1411 1412
     */
    public Schedule querySchedule(int id) {
B
bao liang 已提交
1413
        return scheduleMapper.selectById(id);
L
ligang 已提交
1414 1415
    }

1416 1417 1418 1419 1420 1421 1422 1423 1424
    /**
     * query Schedule by processDefinitionId
     * @param processDefinitionId processDefinitionId
     * @see Schedule
     */
    public List<Schedule> queryReleaseSchedulerListByProcessDefinitionId(int processDefinitionId) {
        return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId);
    }

L
ligang 已提交
1425
    /**
1426 1427 1428
     * query need failover process instance
     * @param host host
     * @return process instance list
L
ligang 已提交
1429
     */
1430 1431
    public List<ProcessInstance> queryNeedFailoverProcessInstances(String host){

B
bao liang 已提交
1432
        return processInstanceMapper.queryByHostAndStatus(host, stateArray);
1433 1434
    }

L
ligang 已提交
1435 1436
    /**
     * process need failover process instance
1437
     * @param processInstance processInstance
L
ligang 已提交
1438
     */
1439
    @Transactional(rollbackFor = Exception.class)
L
ligang 已提交
1440 1441
    public void processNeedFailoverProcessInstances(ProcessInstance processInstance){
        //1 update processInstance host is null
1442
        processInstance.setHost("null");
B
bao liang 已提交
1443
        processInstanceMapper.updateById(processInstance);
L
ligang 已提交
1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455

        //2 insert into recover command
        Command cmd = new Command();
        cmd.setProcessDefinitionId(processInstance.getProcessDefinitionId());
        cmd.setCommandParam(String.format("{\"%s\":%d}", Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
        cmd.setExecutorId(processInstance.getExecutorId());
        cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
        createCommand(cmd);
    }

    /**
     * query all need failover task instances by host
1456 1457
     * @param host host
     * @return task instance list
L
ligang 已提交
1458 1459
     */
    public List<TaskInstance> queryNeedFailoverTaskInstances(String host){
B
bao liang 已提交
1460
        return taskInstanceMapper.queryByHostAndStatus(host,
B
bao liang 已提交
1461
                stateArray);
L
ligang 已提交
1462 1463 1464 1465
    }

    /**
     * find data source by id
1466 1467
     * @param id id
     * @return datasource
L
ligang 已提交
1468 1469
     */
    public DataSource findDataSourceById(int id){
B
bao liang 已提交
1470
        return dataSourceMapper.selectById(id);
L
ligang 已提交
1471 1472 1473 1474 1475
    }


    /**
     * update process instance state by id
1476 1477 1478
     * @param processInstanceId processInstanceId
     * @param executionStatus executionStatus
     * @return update process result
L
ligang 已提交
1479 1480
     */
    public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
B
bao liang 已提交
1481 1482 1483
        ProcessInstance instance = processInstanceMapper.selectById(processInstanceId);
        instance.setState(executionStatus);
        return processInstanceMapper.updateById(instance);
L
ligang 已提交
1484 1485 1486 1487 1488

    }

    /**
     * find process instance by the task id
1489 1490
     * @param taskId taskId
     * @return process instance
L
ligang 已提交
1491 1492
     */
    public ProcessInstance findProcessInstanceByTaskId(int taskId){
B
bao liang 已提交
1493 1494 1495 1496 1497
        TaskInstance taskInstance = taskInstanceMapper.selectById(taskId);
        if(taskInstance!= null){
            return processInstanceMapper.selectById(taskInstance.getProcessInstanceId());
        }
        return null;
L
ligang 已提交
1498 1499 1500 1501
    }

    /**
     * find udf function list by id list string
1502 1503
     * @param ids ids
     * @return udf function list
L
ligang 已提交
1504
     */
1505
    public List<UdfFunc> queryUdfFunListByids(int[] ids){
B
bao liang 已提交
1506
        return udfFuncMapper.queryUdfByIdStr(ids, null);
L
ligang 已提交
1507 1508 1509 1510
    }

    /**
     * find tenant code by resource name
1511
     * @param resName resource name
1512
     * @param resourceType resource type
1513
     * @return tenant code
L
ligang 已提交
1514
     */
1515
    public String queryTenantCodeByResName(String resName,ResourceType resourceType){
1516
        return resourceMapper.queryTenantCodeByResourceName(resName, resourceType.ordinal());
L
ligang 已提交
1517 1518 1519 1520
    }

    /**
     * find schedule list by process define id.
1521 1522
     * @param ids ids
     * @return schedule list
L
ligang 已提交
1523 1524
     */
    public List<Schedule> selectAllByProcessDefineId(int[] ids){
B
bao liang 已提交
1525
        return scheduleMapper.selectAllByProcessDefineArray(
B
bao liang 已提交
1526
                ids);
L
ligang 已提交
1527 1528 1529 1530
    }

    /**
     * get dependency cycle by work process define id and scheduler fire time
1531 1532
     * @param masterId masterId
     * @param processDefinitionId processDefinitionId
Q
qiaozhanwei 已提交
1533
     * @param scheduledFireTime the time the task schedule is expected to trigger
1534 1535
     * @return CycleDependency
     * @throws Exception if error throws Exception
L
ligang 已提交
1536 1537 1538 1539 1540 1541 1542 1543 1544
     */
    public CycleDependency getCycleDependency(int masterId, int processDefinitionId, Date scheduledFireTime) throws Exception {
        List<CycleDependency> list = getCycleDependencies(masterId,new int[]{processDefinitionId},scheduledFireTime);
        return list.size()>0 ? list.get(0) : null;

    }

    /**
     * get dependency cycle list by work process define id list and scheduler fire time
1545 1546
     * @param masterId masterId
     * @param ids ids
Q
qiaozhanwei 已提交
1547
     * @param scheduledFireTime the time the task schedule is expected to trigger
1548 1549
     * @return CycleDependency list
     * @throws Exception if error throws Exception
L
ligang 已提交
1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567
     */
    public List<CycleDependency> getCycleDependencies(int masterId,int[] ids,Date scheduledFireTime) throws Exception {
        List<CycleDependency> cycleDependencyList =  new ArrayList<CycleDependency>();
        if(ArrayUtils.isEmpty(ids)){
            logger.warn("ids[] is empty!is invalid!");
            return cycleDependencyList;
        }
        if(scheduledFireTime == null){
            logger.warn("scheduledFireTime is null!is invalid!");
            return cycleDependencyList;
        }


        String strCrontab = "";
        CronExpression depCronExpression;
        Cron depCron;
        List<Date> list;
        List<Schedule> schedules = this.selectAllByProcessDefineId(ids);
Q
qiaozhanwei 已提交
1568
        // for all scheduling information
L
ligang 已提交
1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616
        for(Schedule depSchedule:schedules){
            strCrontab = depSchedule.getCrontab();
            depCronExpression = CronUtils.parse2CronExpression(strCrontab);
            depCron = CronUtils.parse2Cron(strCrontab);
            CycleEnum cycleEnum = CronUtils.getMiniCycle(depCron);
            if(cycleEnum == null){
                logger.error("{} is not valid",strCrontab);
                continue;
            }
            Calendar calendar = Calendar.getInstance();
            switch (cycleEnum){
                /*case MINUTE:
                    calendar.add(Calendar.MINUTE,-61);*/
                case HOUR:
                    calendar.add(Calendar.HOUR,-25);
                    break;
                case DAY:
                    calendar.add(Calendar.DATE,-32);
                    break;
                case WEEK:
                    calendar.add(Calendar.DATE,-32);
                    break;
                case MONTH:
                    calendar.add(Calendar.MONTH,-13);
                    break;
                default:
                    logger.warn("Dependent process definition's  cycleEnum is {},not support!!", cycleEnum.name());
                    continue;
            }
            Date start = calendar.getTime();

            if(depSchedule.getProcessDefinitionId() == masterId){
                list = CronUtils.getSelfFireDateList(start, scheduledFireTime, depCronExpression);
            }else {
                list = CronUtils.getFireDateList(start, scheduledFireTime, depCronExpression);
            }
            if(list.size()>=1){
                start = list.get(list.size()-1);
                CycleDependency dependency = new CycleDependency(depSchedule.getProcessDefinitionId(),start, CronUtils.getExpirationTime(start, cycleEnum), cycleEnum);
                cycleDependencyList.add(dependency);
            }

        }
        return cycleDependencyList;
    }

    /**
     * find last scheduler process instance in the date interval
1617 1618 1619
     * @param definitionId definitionId
     * @param dateInterval dateInterval
     * @return process instance
L
ligang 已提交
1620 1621 1622
     */
    public ProcessInstance findLastSchedulerProcessInterval(int definitionId, DateInterval dateInterval) {
        return processInstanceMapper.queryLastSchedulerProcess(definitionId,
B
bao liang 已提交
1623 1624
                dateInterval.getStartTime(),
                dateInterval.getEndTime());
L
ligang 已提交
1625 1626
    }

1627 1628 1629 1630 1631 1632
    /**
     * find last manual process instance interval
     * @param definitionId process definition id
     * @param dateInterval dateInterval
     * @return process instance
     */
L
ligang 已提交
1633 1634
    public ProcessInstance findLastManualProcessInterval(int definitionId, DateInterval dateInterval) {
        return processInstanceMapper.queryLastManualProcess(definitionId,
B
bao liang 已提交
1635 1636
                dateInterval.getStartTime(),
                dateInterval.getEndTime());
L
ligang 已提交
1637 1638
    }

1639 1640 1641
    /**
     * find last running process instance
     * @param definitionId  process definition id
1642 1643
     * @param startTime start time
     * @param endTime end time
1644 1645
     * @return process instance
     */
1646
    public ProcessInstance findLastRunningProcess(int definitionId, Date startTime, Date endTime) {
L
ligang 已提交
1647
        return processInstanceMapper.queryLastRunningProcess(definitionId,
1648 1649
                startTime,
                endTime,
L
ligang 已提交
1650 1651
                stateArray);
    }
1652 1653

    /**
1654 1655 1656
     * query user queue by process instance id
     * @param processInstanceId processInstanceId
     * @return queue
1657
     */
1658
    public String queryUserQueueByProcessInstanceId(int processInstanceId){
B
bao liang 已提交
1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669

        String queue = "";
        ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
        if(processInstance == null){
            return queue;
        }
        User executor = userMapper.selectById(processInstance.getExecutorId());
        if(executor != null){
            queue = executor.getQueue();
        }
        return queue;
1670 1671
    }

1672

B
baoliang 已提交
1673

1674
    /**
L
lgcareer 已提交
1675
     * get task worker group
1676 1677
     * @param taskInstance taskInstance
     * @return workerGroupId
1678
     */
L
lgcareer 已提交
1679 1680
    public String getTaskWorkerGroup(TaskInstance taskInstance) {
        String workerGroup = taskInstance.getWorkerGroup();
1681

L
lgcareer 已提交
1682 1683
        if(StringUtils.isNotBlank(workerGroup)){
            return workerGroup;
1684 1685
        }
        int processInstanceId = taskInstance.getProcessInstanceId();
1686
        ProcessInstance processInstance = findProcessInstanceById(processInstanceId);
1687

1688
        if(processInstance != null){
L
lgcareer 已提交
1689
            return processInstance.getWorkerGroup();
1690
        }
L
lgcareer 已提交
1691 1692
        logger.info("task : {} will use default worker group", taskInstance.getId());
        return Constants.DEFAULT_WORKER_GROUP;
1693 1694
    }

1695
    /**
1696 1697 1698
     * get have perm project list
     * @param userId userId
     * @return project list
1699
     */
1700 1701 1702
    public List<Project> getProjectListHavePerm(int userId){
        List<Project> createProjects = projectMapper.queryProjectCreatedByUser(userId);
        List<Project> authedProjects = projectMapper.queryAuthedProjectListByUserId(userId);
1703

B
bao liang 已提交
1704 1705 1706
        if(createProjects == null){
            createProjects = new ArrayList<>();
        }
1707

B
bao liang 已提交
1708 1709
        if(authedProjects != null){
            createProjects.addAll(authedProjects);
1710
        }
1711
        return createProjects;
B
bao liang 已提交
1712
    }
1713

1714 1715 1716 1717 1718
    /**
     * get have perm project ids
     * @param userId userId
     * @return project ids
     */
B
bao liang 已提交
1719 1720 1721 1722 1723 1724 1725
    public List<Integer> getProjectIdListHavePerm(int userId){

        List<Integer> projectIdList = new ArrayList<>();
        for(Project project : getProjectListHavePerm(userId)){
            projectIdList.add(project.getId());
        }
        return projectIdList;
1726 1727
    }

1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740
    /**
     * list unauthorized udf function
     * @param userId    user id
     * @param needChecks  data source id array
     * @return unauthorized udf function list
     */
    public <T> List<T> listUnauthorized(int userId,T[] needChecks,AuthorizationType authorizationType){
        List<T> resultList = new ArrayList<T>();

        if (!ArrayUtils.isEmpty(needChecks)) {
            Set<T> originResSet = new HashSet<T>(Arrays.asList(needChecks));

            switch (authorizationType){
1741 1742 1743 1744 1745 1746
                case RESOURCE_FILE_ID:
                    Set<Integer> authorizedResourceFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
                    originResSet.removeAll(authorizedResourceFiles);
                    break;
                case RESOURCE_FILE_NAME:
                    Set<String> authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> t.getFullName()).collect(toSet());
1747 1748
                    originResSet.removeAll(authorizedResources);
                    break;
1749 1750 1751 1752
                case UDF_FILE:
                    Set<Integer> authorizedUdfFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
                    originResSet.removeAll(authorizedUdfFiles);
                    break;
1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774
                case DATASOURCE:
                    Set<Integer> authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId,needChecks).stream().map(t -> t.getId()).collect(toSet());
                    originResSet.removeAll(authorizedDatasources);
                    break;
                case UDF:
                    Set<Integer> authorizedUdfs = udfFuncMapper.listAuthorizedUdfFunc(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
                    originResSet.removeAll(authorizedUdfs);
                    break;
            }

            resultList.addAll(originResSet);
        }

        return resultList;
    }

    /**
     * get user by user id
     * @param userId user id
     * @return User
     */
    public User getUserById(int userId){
1775
        return userMapper.queryDetailsById(userId);
1776 1777
    }

1778 1779 1780 1781 1782 1783 1784 1785 1786
    /**
     * get resource by resoruce id
     * @param resoruceId resource id
     * @return Resource
     */
    public Resource getResourceById(int resoruceId){
        return resourceMapper.selectById(resoruceId);
    }

1787

1788 1789 1790 1791 1792 1793 1794 1795 1796
    /**
     * list resources by ids
     * @param resIds resIds
     * @return resource list
     */
    public List<Resource> listResourceByIds(Integer[] resIds){
        return resourceMapper.listResourceByIds(resIds);
    }

1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813
    /**
     * format task app id in task instance
     * @param taskInstance
     * @return
     */
    public String formatTaskAppId(TaskInstance taskInstance){
        ProcessDefinition definition = this.findProcessDefineById(taskInstance.getProcessDefinitionId());
        ProcessInstance processInstanceById = this.findProcessInstanceById(taskInstance.getProcessInstanceId());

        if(definition == null || processInstanceById == null){
            return "";
        }
        return String.format("%s_%s_%s",
                definition.getId(),
                processInstanceById.getId(),
                taskInstance.getId());
    }
1814

L
ligang 已提交
1815
}