diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py index b47b8e3bfdd030156ca72c2d7ec2243329e1f050..2b30d0a98c55f53925654820d4ab9cae338392bd 100644 --- a/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py +++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py @@ -34,8 +34,7 @@ from pydolphinscheduler.tasks.shell import Shell from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition with ProcessDefinition( - name="task_switch_example", - tenant="tenant_exists", + name="task_switch_example", tenant="tenant_exists", param={"var": "1"} ) as pd: parent = Shell(name="parent", command="echo parent") switch_child_1 = Shell(name="switch_child_1", command="echo switch_child_1") diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py index 70d4e6b8e0aee3c78a0f06b944e01f7cf2138547..4941a85a0086f8ae06462277da2f51abbece7745 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py @@ -24,6 +24,7 @@ from typing import Any, Dict, List, Optional, Set from pydolphinscheduler.constants import ( ProcessDefinitionDefault, ProcessDefinitionReleaseState, + TaskType, ) from pydolphinscheduler.core.base import Base from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException @@ -97,7 +98,7 @@ class ProcessDefinition(Base): worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP, timeout: Optional[int] = 0, release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE, - param: Optional[List] = None, + param: Optional[Dict] = None, ): super().__init__(name, description) self.schedule = schedule @@ -189,6 +190,22 @@ class ProcessDefinition(Base): """Set attribute end_time.""" self._end_time = val + @property + def param_json(self) -> Optional[List[Dict]]: + """Return param json base on self.param.""" + # Handle empty dict and None value + if not self.param: + return None + return [ + { + "prop": k, + "direct": "IN", + "type": "VARCHAR", + "value": v, + } + for k, v in self.param.items() + ] + @property def task_definition_json(self) -> List[Dict]: """Return all tasks definition in list of dict.""" @@ -323,16 +340,33 @@ class ProcessDefinition(Base): # Project model need User object exists self.project.create_if_not_exists(self._user) + def _pre_submit_check(self): + """Check specific condition satisfy before. + + This method should be called before process definition submit to java gateway + For now, we have below checker: + * `self.param` should be set if task `switch` in this workflow. + """ + if ( + any([task.task_type == TaskType.SWITCH for task in self.tasks.values()]) + and self.param is None + ): + raise PyDSParamException( + "Parameter param must be provider if task Switch in process definition." + ) + def submit(self) -> int: """Submit ProcessDefinition instance to java gateway.""" self._ensure_side_model_exists() + self._pre_submit_check() + gateway = launch_gateway() self._process_definition_code = gateway.entry_point.createOrUpdateProcessDefinition( self._user, self._project, self.name, str(self.description) if self.description else "", - str(self.param) if self.param else None, + json.dumps(self.param_json), json.dumps(self.schedule_json) if self.schedule_json else None, json.dumps(self.task_location), self.timeout, diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py index 8491878ea5dbc9ded8e3c2f296066c7618ece35d..694f9e43bafae5c343c417555f406964d1bc265d 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py @@ -19,6 +19,7 @@ from datetime import datetime from typing import Any +from unittest.mock import patch import pytest from freezegun import freeze_time @@ -30,10 +31,12 @@ from pydolphinscheduler.constants import ( from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.exceptions import PyDSParamException from pydolphinscheduler.side import Project, Tenant, User +from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition from pydolphinscheduler.utils.date import conv_to_schedule from tests.testing.task import Task TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition" +TEST_TASK_TYPE = "test-task-type" @pytest.mark.parametrize("func", ["run", "submit", "start"]) @@ -151,6 +154,80 @@ def test__parse_datetime_not_support_type(val: Any): pd._parse_datetime(val) +@pytest.mark.parametrize( + "param, expect", + [ + ( + None, + None, + ), + ( + {}, + None, + ), + ( + {"key1": "val1"}, + [ + { + "prop": "key1", + "direct": "IN", + "type": "VARCHAR", + "value": "val1", + } + ], + ), + ( + { + "key1": "val1", + "key2": "val2", + }, + [ + { + "prop": "key1", + "direct": "IN", + "type": "VARCHAR", + "value": "val1", + }, + { + "prop": "key2", + "direct": "IN", + "type": "VARCHAR", + "value": "val2", + }, + ], + ), + ], +) +def test_property_param_json(param, expect): + """Test ProcessDefinition's property param_json.""" + pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, param=param) + assert pd.param_json == expect + + +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +def test__pre_submit_check_switch_without_param(mock_code_version): + """Test :func:`_pre_submit_check` if process definition with switch but without attribute param.""" + with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: + parent = Task(name="parent", task_type=TEST_TASK_TYPE) + switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE) + switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE) + switch_condition = SwitchCondition( + Branch(condition="${var} > 1", task=switch_child_1), + Default(task=switch_child_2), + ) + + switch = Switch(name="switch", condition=switch_condition) + parent >> switch + with pytest.raises( + PyDSParamException, + match="Parameter param must be provider if task Switch in process definition.", + ): + pd._pre_submit_check() + + def test_process_definition_get_define_without_task(): """Test process definition function get_define without task.""" expect = {