未验证 提交 f2aef53a 编写于 作者: J Jiajie Zhong 提交者: GitHub

Fix bug in python example (#7681)

Fix example bug in switch and datax task type.
Fix misunderstanding for condition node

* [python] Fix switch example workflow name conflict to dependent

* [python] Fix task condition missing branch success and fail

* [python] Task datax add more detail example
上级 54525806
......@@ -22,11 +22,11 @@ This example will create five task in single workflow, with four shell task and
condition have one upstream which we declare explicit with syntax `parent >> condition`, and three downstream
automatically set dependence by condition task by passing parameter `condition`. The graph of this workflow
like:
pre_task_success_1 ->
\
pre_task_success_2 -> --> conditions -> end
/
pre_task_fail ->
pre_task_1 -> -> success_branch
\ /
pre_task_2 -> -> conditions ->
/ \
pre_task_3 -> -> fail_branch
.
"""
......@@ -34,23 +34,24 @@ from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Conditions
from pydolphinscheduler.tasks.shell import Shell
with ProcessDefinition(name="task_conditions_example", tenant="tenant_exists") as pd:
condition_pre_task_1 = Shell(
name="pre_task_success_1", command="echo pre_task_success_1"
)
condition_pre_task_2 = Shell(
name="pre_task_success_2", command="echo pre_task_success_2"
)
condition_pre_task_3 = Shell(name="pre_task_fail", command="echo pre_task_fail")
with ProcessDefinition(name="task_conditions_example_1", tenant="tenant_exists") as pd:
pre_task_1 = Shell(name="pre_task_1", command="echo pre_task_1")
pre_task_2 = Shell(name="pre_task_2", command="echo pre_task_2")
pre_task_3 = Shell(name="pre_task_3", command="echo pre_task_3")
cond_operator = And(
And(
SUCCESS(condition_pre_task_1, condition_pre_task_2),
FAILURE(condition_pre_task_3),
SUCCESS(pre_task_1, pre_task_2),
FAILURE(pre_task_3),
),
)
end = Shell(name="end", command="echo parent")
success_branch = Shell(name="success_branch", command="success_branch parent")
fail_branch = Shell(name="fail_branch", command="echo fail_branch")
condition = Conditions(name="conditions", condition=cond_operator)
condition >> end
condition = Conditions(
name="conditions",
condition=cond_operator,
success_task=success_branch,
failed_task=fail_branch,
)
pd.submit()
......@@ -29,10 +29,61 @@ from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.datax import CustomDataX, DataX
# datax json template
JSON_TEMPLATE = ""
JSON_TEMPLATE = {
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "usr",
"password": "pwd",
"column": [
"id",
"name",
"code",
"description"
],
"splitPk": "id",
"connection": [
{
"table": [
"source_table"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/source_db"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "usr",
"password": "pwd",
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db",
"table": [
"target_table"
]
}
]
}
}
}
]
}
}
with ProcessDefinition(
name="task_datax",
name="task_datax_1",
tenant="tenant_exists",
) as pd:
# This task synchronizes the data in `t_ds_project`
......@@ -45,6 +96,7 @@ with ProcessDefinition(
target_table="target_table",
)
# you can custom json_template of datax to sync data.
task2 = CustomDataX(name="task_custom_datax", json=JSON_TEMPLATE)
# you can custom json_template of datax to sync data. This task create job
# same as task1 do
task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE))
pd.run()
......@@ -34,7 +34,7 @@ from pydolphinscheduler.tasks.shell import Shell
from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
with ProcessDefinition(
name="task_dependent_external",
name="task_switch_example",
tenant="tenant_exists",
) as pd:
parent = Shell(name="parent", command="echo parent")
......
......@@ -34,18 +34,14 @@ from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.shell import Shell
with ProcessDefinition(
name="tutorial",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
name="aklsfkkalsfjkol",
tenant="tenant_exists",
) as pd:
task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
task_child_one = Shell(name="task_child_one", command="echo 'child one'")
task_child_two = Shell(name="task_child_two", command="echo 'child two'")
task_union = Shell(name="task_union", command="echo union")
task_group = [task_child_one, task_child_two]
task_parent.set_downstream(task_group)
task_union << task_group
......
......@@ -26,6 +26,7 @@ from pydolphinscheduler.constants import (
TaskFlag,
TaskPriority,
TaskTimeoutFlag,
TaskType,
)
from pydolphinscheduler.core.base import Base
from pydolphinscheduler.core.process_definition import (
......@@ -156,7 +157,8 @@ class Task(Base):
self.resource_list = resource_list or []
self.dependence = dependence or {}
self.wait_start_timeout = wait_start_timeout or {}
self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
if task_type != TaskType.CONDITIONS:
self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
@property
def process_definition(self) -> Optional[ProcessDefinition]:
......
......@@ -157,9 +157,19 @@ class Or(ConditionOperator):
class Conditions(Task):
"""Task condition object, declare behavior for condition task to dolphinscheduler."""
def __init__(self, name: str, condition: ConditionOperator, *args, **kwargs):
def __init__(
self,
name: str,
condition: ConditionOperator,
success_task: Task,
failed_task: Task,
*args,
**kwargs,
):
super().__init__(name, TaskType.CONDITIONS, *args, **kwargs)
self.condition = condition
self.success_task = success_task
self.failed_task = failed_task
# Set condition tasks as current task downstream
self._set_dep()
......@@ -171,6 +181,15 @@ class Conditions(Task):
for status in cond.args:
upstream.extend(list(status.tasks))
self.set_upstream(upstream)
self.set_downstream([self.success_task, self.failed_task])
@property
def condition_result(self) -> Dict:
"""Get condition result define for java gateway."""
return {
"successNode": [self.success_task.code],
"failedNode": [self.failed_task.code],
}
@property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
......@@ -182,4 +201,5 @@ class Conditions(Task):
"""
params = super().task_params
params["dependence"] = self.condition.get_define()
params["conditionResult"] = self.condition_result
return params
......@@ -324,7 +324,7 @@ def test_condition_operator_set_define_attr_mix_operator(
"pydolphinscheduler.tasks.condition.Conditions.gen_code_and_version",
return_value=(123, 1),
)
def test_dependent_get_define(mock_condition_code_version, mock_task_code_version):
def test_condition_get_define(mock_condition_code_version, mock_task_code_version):
"""Test task condition :func:`get_define`."""
common_task = Task(name="common_task", task_type="test_task_condition")
cond_operator = And(
......@@ -372,7 +372,10 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio
},
],
},
"conditionResult": {"successNode": [""], "failedNode": [""]},
"conditionResult": {
"successNode": [common_task.code],
"failedNode": [common_task.code],
},
"waitStartTimeout": {},
},
"flag": "YES",
......@@ -385,7 +388,9 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio
"timeout": 0,
}
task = Conditions(name, condition=cond_operator)
task = Conditions(
name, condition=cond_operator, success_task=common_task, failed_task=common_task
)
assert task.get_define() == expect
......@@ -396,49 +401,60 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio
def test_condition_set_dep_workflow(mock_task_code_version):
"""Test task condition set dependence in workflow level."""
with ProcessDefinition(name="test-condition-set-dep-workflow") as pd:
condition_pre_task_1 = Task(name="pre_task_success_1", task_type=TEST_TYPE)
condition_pre_task_2 = Task(name="pre_task_success_2", task_type=TEST_TYPE)
condition_pre_task_3 = Task(name="pre_task_fail", task_type=TEST_TYPE)
pre_task_1 = Task(name="pre_task_1", task_type=TEST_TYPE)
pre_task_2 = Task(name="pre_task_2", task_type=TEST_TYPE)
pre_task_3 = Task(name="pre_task_3", task_type=TEST_TYPE)
cond_operator = And(
And(
SUCCESS(condition_pre_task_1, condition_pre_task_2),
FAILURE(condition_pre_task_3),
SUCCESS(pre_task_1, pre_task_2),
FAILURE(pre_task_3),
),
)
end = Task(name="end", task_type=TEST_TYPE)
condition = Conditions(name="conditions", condition=cond_operator)
condition >> end
success_branch = Task(name="success_branch", task_type=TEST_TYPE)
fail_branch = Task(name="fail_branch", task_type=TEST_TYPE)
condition = Conditions(
name="conditions",
condition=cond_operator,
success_task=success_branch,
failed_task=fail_branch,
)
# General tasks test
assert len(pd.tasks) == 5
assert len(pd.tasks) == 6
assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
[
pre_task_1,
pre_task_2,
pre_task_3,
success_branch,
fail_branch,
condition,
condition_pre_task_1,
condition_pre_task_2,
condition_pre_task_3,
end,
],
key=lambda t: t.name,
)
# Task dep test
assert end._upstream_task_codes == {condition.code}
assert condition._downstream_task_codes == {end.code}
assert success_branch._upstream_task_codes == {condition.code}
assert fail_branch._upstream_task_codes == {condition.code}
assert condition._downstream_task_codes == {
success_branch.code,
fail_branch.code,
}
# Condition task dep after ProcessDefinition function get_define called
assert condition._upstream_task_codes == {
condition_pre_task_1.code,
condition_pre_task_2.code,
condition_pre_task_3.code,
pre_task_1.code,
pre_task_2.code,
pre_task_3.code,
}
assert all(
[
child._downstream_task_codes == {condition.code}
for child in [
condition_pre_task_1,
condition_pre_task_2,
condition_pre_task_3,
pre_task_1,
pre_task_2,
pre_task_3,
]
]
)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册