diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py index 649e2ce51ab4f41ae1192691385e6683b5c7cd38..dbf2c4179597351827d41b46aacdb53fa357289d 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py @@ -350,14 +350,16 @@ class ProcessDefinition(Base): This method should be called before process definition submit to java gateway For now, we have below checker: - * `self.param` should be set if task `switch` in this workflow. + * `self.param` or at least one local param of task should be set if task `switch` in this workflow. """ if ( any([task.task_type == TaskType.SWITCH for task in self.tasks.values()]) and self.param is None + and all([len(task.local_params) == 0 for task in self.tasks.values()]) ): raise PyDSParamException( - "Parameter param must be provider if task Switch in process definition." + "Parameter param or at least one local_param of task must " + "be provider if task Switch in process definition." ) def submit(self) -> int: diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py index 28032f88e7bd93025bae4967683d510c8998c9d7..0c9a2b82b3df5ccbfc979426895a8757abd3daf9 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py @@ -129,7 +129,11 @@ class SwitchCondition(Base): class Switch(Task): - """Task switch object, declare behavior for switch task to dolphinscheduler.""" + """Task switch object, declare behavior for switch task to dolphinscheduler. + + Param of process definition or at least one local param of task must be set + if task `switch` in this workflow. + """ def __init__(self, name: str, condition: SwitchCondition, *args, **kwargs): super().__init__(name, TaskType.SWITCH, *args, **kwargs) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py index 88028f72c71b8845921d128a8a0ddf322eb2b364..36e1cb035eeec6cb6c357f593dd7d90e551e9ccc 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py @@ -240,11 +240,38 @@ def test__pre_submit_check_switch_without_param(mock_code_version): parent >> switch with pytest.raises( PyDSParamException, - match="Parameter param must be provider if task Switch in process definition.", + match="Parameter param or at least one local_param of task must " + "be provider if task Switch in process definition.", ): pd._pre_submit_check() +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +def test__pre_submit_check_switch_with_local_params(mock_code_version): + """Test :func:`_pre_submit_check` if process definition with switch with local params of task.""" + with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: + parent = Task( + name="parent", + task_type=TEST_TASK_TYPE, + local_params=[ + {"prop": "var", "direct": "OUT", "type": "VARCHAR", "value": ""} + ], + ) + switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE) + switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE) + switch_condition = SwitchCondition( + Branch(condition="${var} > 1", task=switch_child_1), + Default(task=switch_child_2), + ) + + switch = Switch(name="switch", condition=switch_condition) + parent >> switch + pd._pre_submit_check() + + def test_process_definition_get_define_without_task(): """Test process definition function get_define without task.""" expect = {