From f6fea06f1051f5962106e6299eea9696bddf12d4 Mon Sep 17 00:00:00 2001 From: hstdream <33045461+hstdream@users.noreply.github.com> Date: Sun, 12 Jun 2022 12:35:08 +0800 Subject: [PATCH] [Improve] Enhance complement function transformation (#10376) --- docs/docs/en/about/glossary.md | 2 +- docs/docs/zh/about/glossary.md | 2 +- .../api/controller/ExecutorController.java | 2 +- .../dolphinscheduler/api/enums/Status.java | 1 + .../api/service/impl/ExecutorServiceImpl.java | 306 ++++++++++++------ .../api/service/ExecutorServiceTest.java | 14 +- .../dolphinscheduler/common/Constants.java | 11 + .../runner/WorkflowExecuteRunnable.java | 152 +++++---- .../service/corn/CronUtils.java | 53 +-- 9 files changed, 344 insertions(+), 199 deletions(-) diff --git a/docs/docs/en/about/glossary.md b/docs/docs/en/about/glossary.md index 492fdee96..f8ad9355b 100644 --- a/docs/docs/en/about/glossary.md +++ b/docs/docs/en/about/glossary.md @@ -45,7 +45,7 @@ provided. **Continue** refers to regardless of the status of the task running in failure. **End** means that once a failed task is found, Kill will also run the parallel task at the same time, and the process fails and ends -**Complement**: Supplement historical data,Supports **interval parallel and serial** two complement methods +**Complement**: Supplement historical data,supports **interval parallel and serial** two complement methods, and two types of date selection which include **date range** and **date enumeration**. ### 2.Module introduction diff --git a/docs/docs/zh/about/glossary.md b/docs/docs/zh/about/glossary.md index f3ce5f3bd..dd3043cc2 100644 --- a/docs/docs/zh/about/glossary.md +++ b/docs/docs/zh/about/glossary.md @@ -30,7 +30,7 @@ **失败策略**:对于并行运行的任务,如果有任务失败,提供两种失败策略处理方式,**继续**是指不管并行运行任务的状态,直到流程失败结束。**结束**是指一旦发现失败任务,则同时Kill掉正在运行的并行任务,流程失败结束 -**补数**:补历史数据,支持**区间并行和串行**两种补数方式 +**补数**:补历史数据,支持**区间并行和串行**两种补数方式,其日期选择方式包括**日期范围**和**日期枚举**两种 ### 2.模块介绍 diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index ba230d57a..90c4ecab5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -87,7 +87,7 @@ public class ExecutorController extends BaseController { * @param loginUser login user * @param projectCode project code * @param processDefinitionCode process definition code - * @param scheduleTime schedule time + * @param scheduleTime schedule time when CommandType is COMPLEMENT_DATA there are two ways to transfer parameters 1.date range, for example:{"complementStartDate":"2022-01-01 12:12:12","complementEndDate":"2022-01-6 12:12:12"} 2.manual input, for example:{"complementScheduleDateList":"2022-01-01 00:00:00,2022-01-02 12:12:12,2022-01-03 12:12:12"} * @param failureStrategy failure strategy * @param startNodeList start nodes list * @param taskDependType task depend type diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 1b1bbbe46..8f5b30168 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -407,6 +407,7 @@ public enum Status { NO_CURRENT_OPERATING_PERMISSION(1400001, "The current user does not have this permission.", "当前用户无此权限"), FUNCTION_DISABLED(1400002, "The current feature is disabled.", "当前功能已被禁用"), + SCHEDULE_TIME_NUMBER(1400003, "The number of complement dates exceed 100.", "补数日期个数超过100"), ; private final int code; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 9fafacb4f..0d7376681 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -17,14 +17,12 @@ package org.apache.dolphinscheduler.api.service.impl; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT; - +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Lists; +import org.apache.commons.beanutils.BeanUtils; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant; import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; @@ -69,13 +67,13 @@ import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.service.corn.CronUtils; import org.apache.dolphinscheduler.service.process.ProcessService; - -import org.apache.commons.beanutils.BeanUtils; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -84,12 +82,16 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import com.fasterxml.jackson.core.type.TypeReference; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.COMMA; +import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT; +import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_TIME_MAX_LENGTH; /** * executor service impl @@ -187,11 +189,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return result; } + if(!checkScheduleTimeNum(commandType,cronTime)){ + putMsg(result, Status.SCHEDULE_TIME_NUMBER); + return result; + } + // check master exists if (!checkMasterExists(result)) { return result; } - /** * create command */ @@ -228,6 +234,29 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return true; } + /** + * + * @param complementData + * @param cronTime + * @return CommandType is COMPLEMENT_DATA and cronTime's number is not greater than 100 return true , otherwise return false + */ + private boolean checkScheduleTimeNum(CommandType complementData,String cronTime) { + if (!CommandType.COMPLEMENT_DATA.equals(complementData)) { + return true; + } + if(cronTime == null){ + return true; + } + Map cronMap = JSONUtils.toMap(cronTime); + if (cronMap.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { + String[] stringDates = cronMap.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).split(COMMA); + if (stringDates.length > SCHEDULE_TIME_MAX_LENGTH) { + return false; + } + } + return true; + } + /** * check whether the process definition can be executed * @@ -655,27 +684,16 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } command.setProcessInstanceId(0); - Date start = null; - Date end = null; - if (!StringUtils.isEmpty(schedule)) { - String[] interval = schedule.split(","); - if (interval.length == 2) { - start = DateUtils.getScheduleDate(interval[0]); - end = DateUtils.getScheduleDate(interval[1]); - if (start.after(end)) { - logger.info("complement data error, wrong date start:{} and end date:{} ", - start, end - ); - return 0; - } - } - } // determine whether to complement if (commandType == CommandType.COMPLEMENT_DATA) { - if (start == null || end == null) { + if (schedule == null || StringUtils.isEmpty(schedule)) { return 0; } - return createComplementCommandList(start, end, runMode, command, expectedParallelismNumber, complementDependentMode); + int check = checkScheduleTime(schedule); + if(check == 0){ + return 0; + } + return createComplementCommandList(schedule, runMode, command, expectedParallelismNumber, complementDependentMode); } else { command.setCommandParam(JSONUtils.toJsonString(cmdParam)); return processService.createCommand(command); @@ -686,89 +704,117 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * create complement command * close left and close right * - * @param start - * @param end + * @param scheduleTimeParam * @param runMode * @return */ - protected int createComplementCommandList(Date start, Date end, RunMode runMode, Command command, + protected int createComplementCommandList(String scheduleTimeParam, RunMode runMode, Command command, Integer expectedParallelismNumber, ComplementDependentMode complementDependentMode) { int createCount = 0; + String startDate = null; + String endDate = null; + String dateList = null; int dependentProcessDefinitionCreateCount = 0; - runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode; Map cmdParam = JSONUtils.toMap(command.getCommandParam()); + Map scheduleParam = JSONUtils.toMap(scheduleTimeParam); + if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){ + dateList = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST); + dateList = removeDuplicates(dateList); + } + if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) && scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_END_DATE)){ + startDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); + endDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); + } switch (runMode) { case RUN_MODE_SERIAL: { - if (start.after(end)) { - logger.warn("The startDate {} is later than the endDate {}", start, end); - break; + if(StringUtils.isNotEmpty(dateList)){ + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList); + command.setCommandParam(JSONUtils.toJsonString(cmdParam)); + createCount = processService.createCommand(command); } - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end)); - command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - createCount = processService.createCommand(command); - - // dependent process definition - List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); - - if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { - logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip " - + "dependent complement data", command.getProcessDefinitionCode()); - } else { - dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command); + if(startDate != null && endDate != null){ + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate); + command.setCommandParam(JSONUtils.toJsonString(cmdParam)); + createCount = processService.createCommand(command); + + // dependent process definition + List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); + + if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { + logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip " + + "dependent complement data", command.getProcessDefinitionCode()); + } else { + dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command); + } } - break; } case RUN_MODE_PARALLEL: { - if (start.after(end)) { - logger.warn("The startDate {} is later than the endDate {}", start, end); - break; - } - - List listDate = new ArrayList<>(); - List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); - listDate.addAll(CronUtils.getSelfFireDateList(start, end, schedules)); - int listDateSize = listDate.size(); - createCount = listDate.size(); - if (!CollectionUtils.isEmpty(listDate)) { - if (expectedParallelismNumber != null && expectedParallelismNumber != 0) { - createCount = Math.min(listDate.size(), expectedParallelismNumber); - if (listDateSize < createCount) { - createCount = listDateSize; + if(startDate != null && endDate != null){ + List listDate = new ArrayList<>(); + List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); + listDate.addAll(CronUtils.getSelfFireDateList(DateUtils.getScheduleDate(startDate), DateUtils.getScheduleDate(endDate), schedules)); + int listDateSize = listDate.size(); + createCount = listDate.size(); + if (!CollectionUtils.isEmpty(listDate)) { + if (expectedParallelismNumber != null && expectedParallelismNumber != 0) { + createCount = Math.min(listDate.size(), expectedParallelismNumber); + if (listDateSize < createCount) { + createCount = listDateSize; + } + } + logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount); + + // Distribute the number of tasks equally to each command. + // The last command with insufficient quantity will be assigned to the remaining tasks. + int itemsPerCommand = (listDateSize / createCount); + int remainingItems = (listDateSize % createCount); + int startDateIndex = 0; + int endDateIndex = 0; + + for (int i = 1; i <= createCount; i++) { + int extra = (i <= remainingItems) ? 1 : 0; + int singleCommandItems = (itemsPerCommand + extra); + + if (i == 1) { + endDateIndex += singleCommandItems - 1; + } else { + startDateIndex = endDateIndex + 1; + endDateIndex += singleCommandItems; + } + + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(startDateIndex))); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(endDateIndex))); + command.setCommandParam(JSONUtils.toJsonString(cmdParam)); + processService.createCommand(command); + + if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { + logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip " + + "dependent complement data", command.getProcessDefinitionCode()); + } else { + dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command); + } } } - logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount); - - // Distribute the number of tasks equally to each command. - // The last command with insufficient quantity will be assigned to the remaining tasks. - int itemsPerCommand = (listDateSize / createCount); - int remainingItems = (listDateSize % createCount); - int startDateIndex = 0; - int endDateIndex = 0; - - for (int i = 1; i <= createCount; i++) { - int extra = (i <= remainingItems) ? 1 : 0; - int singleCommandItems = (itemsPerCommand + extra); - - if (i == 1) { - endDateIndex += singleCommandItems - 1; - } else { - startDateIndex = endDateIndex + 1; - endDateIndex += singleCommandItems; + } + if(StringUtils.isNotEmpty(dateList)){ + List listDate = Arrays.asList(dateList.split(COMMA)); + int listDateSize = listDate.size(); + createCount = listDate.size(); + if (!CollectionUtils.isEmpty(listDate)) { + if (expectedParallelismNumber != null && expectedParallelismNumber != 0) { + createCount = Math.min(listDate.size(), expectedParallelismNumber); + if (listDateSize < createCount) { + createCount = listDateSize; + } } - - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(startDateIndex))); - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(endDateIndex))); - command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - processService.createCommand(command); - - if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { - logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip " - + "dependent complement data", command.getProcessDefinitionCode()); - } else { - dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command); + logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount); + for (List stringDate : Lists.partition(listDate, createCount)) { + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate)); + command.setCommandParam(JSONUtils.toJsonString(cmdParam)); + processService.createCommand(command); } } } @@ -854,4 +900,60 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return validDependentProcessDefinitionList; } + + /** + * + * @param schedule + * @return check error return 0 otherwish 1 + */ + private int checkScheduleTime(String schedule){ + Date start = null; + Date end = null; + Map scheduleResult = JSONUtils.toMap(schedule); + if(scheduleResult == null){ + return 0; + } + if(scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){ + if(scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST) == null){ + return 0; + } + } + if(scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)){ + String startDate = scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); + String endDate = scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); + if (startDate == null || endDate == null) { + return 0; + } + start = DateUtils.getScheduleDate(startDate); + end = DateUtils.getScheduleDate(endDate); + if(start == null || end == null){ + return 0; + } + if (start.after(end)) { + logger.error("complement data error, wrong date start:{} and end date:{} ", + start, end + ); + return 0; + } + } + return 1; + } + + /** + * + * @param scheduleTimeList + * @return remove duplicate date list + */ + private String removeDuplicates(String scheduleTimeList){ + HashSet removeDate = new HashSet(); + List resultList = new ArrayList(); + if(StringUtils.isNotEmpty(scheduleTimeList)){ + String[] dateArrays = scheduleTimeList.split(COMMA); + List dateList = Arrays.asList(dateArrays); + removeDate.addAll(dateList); + resultList.addAll(removeDate); + return String.join(COMMA, resultList); + } + return null; + } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index 5d170c499..44e149285 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -199,7 +199,7 @@ public class ExecutorServiceTest { Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList()); Map result = executorService.execProcessInstance(loginUser, projectCode, - processDefinitionCode, cronTime, CommandType.START_PROCESS, + processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.START_PROCESS, null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, @@ -218,7 +218,7 @@ public class ExecutorServiceTest { Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList()); Map result = executorService.execProcessInstance(loginUser, projectCode, - processDefinitionCode, cronTime, CommandType.START_PROCESS, + processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.START_PROCESS, null, "n1,n2", null, null, 0, RunMode.RUN_MODE_SERIAL, @@ -237,7 +237,7 @@ public class ExecutorServiceTest { Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList()); Map result = executorService.execProcessInstance(loginUser, projectCode, - processDefinitionCode, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA, + processDefinitionCode, "{\"complementStartDate\":\"2022-01-07 12:12:12\",\"complementEndDate\":\"2022-01-06 12:12:12\"}", CommandType.COMPLEMENT_DATA, null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, @@ -255,7 +255,7 @@ public class ExecutorServiceTest { Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList()); Map result = executorService.execProcessInstance(loginUser, projectCode, - processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA, + processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.COMPLEMENT_DATA, null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, @@ -273,7 +273,7 @@ public class ExecutorServiceTest { Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList()); Map result = executorService.execProcessInstance(loginUser, projectCode, - processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA, + processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.COMPLEMENT_DATA, null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, @@ -292,7 +292,7 @@ public class ExecutorServiceTest { Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(oneSchedulerList()); Map result = executorService.execProcessInstance(loginUser, projectCode, - processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA, + processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.COMPLEMENT_DATA, null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, @@ -308,7 +308,7 @@ public class ExecutorServiceTest { Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(new ArrayList<>()); Map result = executorService.execProcessInstance(loginUser, projectCode, - processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA, + processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.COMPLEMENT_DATA, null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index b908cd233..90820f6d4 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -361,6 +361,11 @@ public final class Constants { */ public static final String CMDPARAM_COMPLEMENT_DATA_END_DATE = "complementEndDate"; + /** + * complement data Schedule date + */ + public static final String CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST = "complementScheduleDateList"; + /** * complement date default cron string */ @@ -809,4 +814,10 @@ public final class Constants { * tenant */ public static final int TENANT_FULL_NAME_MAX_LENGTH = 30; + + /** + * schedule time the amount of date data is too large, affecting the memory, so set 100 + */ + public static final int SCHEDULE_TIME_MAX_LENGTH = 100; + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index f9caf000d..3365e67c3 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -17,16 +17,10 @@ package org.apache.dolphinscheduler.server.master.runner; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; -import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; -import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR; -import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; - +import com.google.common.collect.Lists; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.FailureStrategy; @@ -75,10 +69,8 @@ import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.corn.CronUtils; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.math.NumberUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; @@ -96,10 +88,18 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; +import static org.apache.dolphinscheduler.common.Constants.COMMA; +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; +import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR; +import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; /** * Workflow execute task, used to execute a workflow instance. @@ -282,14 +282,14 @@ public class WorkflowExecuteRunnable implements Runnable { public String getKey() { if (StringUtils.isNotEmpty(key) - || this.processDefinition == null) { + || this.processDefinition == null) { return key; } key = String.format("%d_%d_%d", - this.processDefinition.getCode(), - this.processDefinition.getVersion(), - this.processInstance.getId()); + this.processDefinition.getCode(), + this.processDefinition.getVersion(), + this.processInstance.getId()); return key; } @@ -503,7 +503,7 @@ public class WorkflowExecuteRunnable implements Runnable { } else { ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId()); this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(), - org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST); + org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST); } } } @@ -526,12 +526,12 @@ public class WorkflowExecuteRunnable implements Runnable { waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance); if (!taskInstance.retryTaskIntervalOverTime()) { logger.info("failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}", - processInstance.getId(), - newTaskInstance.getTaskCode(), - newTaskInstance.getState(), - newTaskInstance.getRetryTimes(), - newTaskInstance.getMaxRetryTimes(), - newTaskInstance.getRetryInterval()); + processInstance.getId(), + newTaskInstance.getTaskCode(), + newTaskInstance.getState(), + newTaskInstance.getRetryTimes(), + newTaskInstance.getMaxRetryTimes(), + newTaskInstance.getRetryInterval()); stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance); stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance); } else { @@ -562,7 +562,7 @@ public class WorkflowExecuteRunnable implements Runnable { logger.info("process instance update: {}", processInstanceId); processInstance = processService.findProcessInstanceById(processInstanceId); processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion()); + processInstance.getProcessDefinitionVersion()); processInstance.setProcessDefinition(processDefinition); } @@ -591,8 +591,8 @@ public class WorkflowExecuteRunnable implements Runnable { public boolean checkProcessInstance(StateEvent stateEvent) { if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) { logger.error("mismatch process instance id: {}, state event:{}", - this.processInstance.getId(), - stateEvent); + this.processInstance.getId(), + stateEvent); return false; } return true; @@ -774,7 +774,12 @@ public class WorkflowExecuteRunnable implements Runnable { if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) { cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING); } - cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, "yyyy-MM-dd HH:mm:ss", null)); + if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){ + cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, cmdParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).substring(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).indexOf(COMMA)+1)); + } + if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)){ + cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, YYYY_MM_DD_HH_MM_SS, null)); + } command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setTaskDependType(processInstance.getTaskDependType()); command.setFailureStrategy(processInstance.getFailureStrategy()); @@ -794,7 +799,7 @@ public class WorkflowExecuteRunnable implements Runnable { private boolean needComplementProcess() { if (processInstance.isComplementData() - && Flag.NO == processInstance.getIsSubProcess()) { + && Flag.NO == processInstance.getIsSubProcess()) { return true; } return false; @@ -874,7 +879,7 @@ public class WorkflowExecuteRunnable implements Runnable { return; } processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion()); + processInstance.getProcessDefinitionVersion()); processInstance.setProcessDefinition(processDefinition); List recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam()); @@ -894,7 +899,7 @@ public class WorkflowExecuteRunnable implements Runnable { List recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList); List startNodeNameList = parseStartNodeName(processInstance.getCommandParam()); ProcessDag processDag = generateFlowDag(taskNodeList, - startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType()); + startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType()); if (processDag == null) { logger.error("processDag is null"); return; @@ -956,15 +961,24 @@ public class WorkflowExecuteRunnable implements Runnable { if (processInstance.isComplementData() && complementListDate.isEmpty()) { Map cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); - if (cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) { + if (cmdParam != null) { // reset global params while there are start parameters setGlobalParamIfCommanded(processDefinition, cmdParam); - Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); - Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); + Date start = null; + Date end = null; + if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_END_DATE)){ + start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); + end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); + } List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode()); if (complementListDate.isEmpty() && needComplementProcess()) { - complementListDate = CronUtils.getSelfFireDateList(start, end, schedules); + if(start != null && end != null){ + complementListDate = CronUtils.getSelfFireDateList(start, end, schedules); + } + if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){ + complementListDate = CronUtils.getSelfScheduleDateList(cmdParam); + } logger.info(" process definition code:{} complement data: {}", processInstance.getProcessDefinitionCode(), complementListDate); @@ -996,15 +1010,15 @@ public class WorkflowExecuteRunnable implements Runnable { taskProcessor.init(taskInstance, processInstance); if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION - && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) { + && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) { notifyProcessHostUpdate(taskInstance); } boolean submit = taskProcessor.action(TaskAction.SUBMIT); if (!submit) { logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!", - processInstance.getId(), processInstance.getName(), - taskInstance.getId(), taskInstance.getName()); + processInstance.getId(), processInstance.getName(), + taskInstance.getId(), taskInstance.getName()); return null; } @@ -1475,10 +1489,10 @@ public class WorkflowExecuteRunnable implements Runnable { */ private ExecutionStatus runningState(ExecutionStatus state) { if (state == ExecutionStatus.READY_STOP - || state == ExecutionStatus.READY_PAUSE - || state == ExecutionStatus.WAITING_THREAD - || state == ExecutionStatus.READY_BLOCK - || state == ExecutionStatus.DELAY_EXECUTION) { + || state == ExecutionStatus.READY_PAUSE + || state == ExecutionStatus.WAITING_THREAD + || state == ExecutionStatus.READY_BLOCK + || state == ExecutionStatus.DELAY_EXECUTION) { // if the running task is not completed, the state remains unchanged return state; } else { @@ -1514,8 +1528,8 @@ public class WorkflowExecuteRunnable implements Runnable { } if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) { return readyToSubmitTaskQueue.size() == 0 - && activeTaskProcessorMaps.size() == 0 - && waitToRetryTaskInstanceMap.size() == 0; + && activeTaskProcessorMaps.size() == 0 + && waitToRetryTaskInstanceMap.size() == 0; } } return false; @@ -1546,9 +1560,9 @@ public class WorkflowExecuteRunnable implements Runnable { List pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE); if (CollectionUtils.isNotEmpty(pauseList) - || processInstance.isBlocked() - || !isComplementEnd() - || readyToSubmitTaskQueue.size() > 0) { + || processInstance.isBlocked() + || !isComplementEnd() + || readyToSubmitTaskQueue.size() > 0) { return ExecutionStatus.PAUSE; } else { return ExecutionStatus.SUCCESS; @@ -1613,9 +1627,9 @@ public class WorkflowExecuteRunnable implements Runnable { List killList = getCompleteTaskByState(ExecutionStatus.KILL); List failList = getCompleteTaskByState(ExecutionStatus.FAILURE); if (CollectionUtils.isNotEmpty(stopList) - || CollectionUtils.isNotEmpty(killList) - || CollectionUtils.isNotEmpty(failList) - || !isComplementEnd()) { + || CollectionUtils.isNotEmpty(killList) + || CollectionUtils.isNotEmpty(failList) + || !isComplementEnd()) { return ExecutionStatus.STOP; } else { return ExecutionStatus.SUCCESS; @@ -1673,10 +1687,10 @@ public class WorkflowExecuteRunnable implements Runnable { ExecutionStatus state = getProcessInstanceState(processInstance); if (processInstance.getState() != state) { logger.info( - "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", - processInstance.getId(), processInstance.getName(), - processInstance.getState(), state, - processInstance.getCommandType()); + "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", + processInstance.getId(), processInstance.getName(), + processInstance.getState(), state, + processInstance.getCommandType()); processInstance.setState(state); if (state.typeIsFinished()) { @@ -1701,10 +1715,10 @@ public class WorkflowExecuteRunnable implements Runnable { ExecutionStatus state = stateEvent.getExecutionStatus(); if (processInstance.getState() != state) { logger.info( - "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", - processInstance.getId(), processInstance.getName(), - processInstance.getState(), state, - processInstance.getCommandType()); + "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", + processInstance.getId(), processInstance.getName(), + processInstance.getState(), state, + processInstance.getCommandType()); processInstance.setState(state); if (state.typeIsFinished()) { @@ -1751,14 +1765,14 @@ public class WorkflowExecuteRunnable implements Runnable { */ private void removeTaskFromStandbyList(TaskInstance taskInstance) { logger.info("remove task from stand by list, id: {} name:{}", - taskInstance.getId(), - taskInstance.getName()); + taskInstance.getId(), + taskInstance.getName()); try { readyToSubmitTaskQueue.remove(taskInstance); } catch (Exception e) { logger.error("remove task instance from readyToSubmitTaskQueue error, task id:{}, Name: {}", - taskInstance.getId(), - taskInstance.getName(), e); + taskInstance.getId(), + taskInstance.getName(), e); } } @@ -1781,7 +1795,7 @@ public class WorkflowExecuteRunnable implements Runnable { */ private void killAllTasks() { logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(), - activeTaskProcessorMaps.size()); + activeTaskProcessorMaps.size()); if (readyToSubmitTaskQueue.size() > 0) { readyToSubmitTaskQueue.clear(); @@ -2070,4 +2084,4 @@ public class WorkflowExecuteRunnable implements Runnable { break; } } -} +} \ No newline at end of file diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java index f1ed739da..376e53384 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java @@ -17,22 +17,19 @@ package org.apache.dolphinscheduler.service.corn; -import static org.apache.dolphinscheduler.service.corn.CycleFactory.day; -import static org.apache.dolphinscheduler.service.corn.CycleFactory.hour; -import static org.apache.dolphinscheduler.service.corn.CycleFactory.min; -import static org.apache.dolphinscheduler.service.corn.CycleFactory.month; -import static org.apache.dolphinscheduler.service.corn.CycleFactory.week; -import static org.apache.dolphinscheduler.service.corn.CycleFactory.year; - -import static com.cronutils.model.CronType.QUARTZ; - +import com.cronutils.model.Cron; +import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.parser.CronParser; +import org.apache.commons.collections.CollectionUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CycleEnum; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.Schedule; - -import org.apache.commons.collections.CollectionUtils; +import org.apache.dolphinscheduler.spi.utils.StringUtils; +import org.quartz.CronExpression; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.text.ParseException; import java.util.ArrayList; @@ -41,14 +38,17 @@ import java.util.Collections; import java.util.Date; import java.util.GregorianCalendar; import java.util.List; +import java.util.Map; -import org.quartz.CronExpression; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cronutils.model.Cron; -import com.cronutils.model.definition.CronDefinitionBuilder; -import com.cronutils.parser.CronParser; +import static com.cronutils.model.CronType.QUARTZ; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; +import static org.apache.dolphinscheduler.common.Constants.COMMA; +import static org.apache.dolphinscheduler.service.corn.CycleFactory.day; +import static org.apache.dolphinscheduler.service.corn.CycleFactory.hour; +import static org.apache.dolphinscheduler.service.corn.CycleFactory.min; +import static org.apache.dolphinscheduler.service.corn.CycleFactory.month; +import static org.apache.dolphinscheduler.service.corn.CycleFactory.week; +import static org.apache.dolphinscheduler.service.corn.CycleFactory.year; /** * // todo: this utils is heavy, it rely on quartz and corn-utils. @@ -283,4 +283,21 @@ public class CronUtils { return end.getTime(); } + /** + * get Schedule Date + * @param param + * @return date list + */ + public static List getSelfScheduleDateList(Map param){ + List result = new ArrayList<>(); + String scheduleDates = param.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST); + if(StringUtils.isNotEmpty(scheduleDates)){ + for (String stringDate : scheduleDates.split(COMMA)) { + result.add(DateUtils.stringToDate(stringDate)); + } + return result; + } + return null; + } + } -- GitLab