diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py index 9bc8d459e988c03a8f525ad47aa25bdf5a555ae8..599b9793698265326d7d2930698f9c9081c8a980 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py @@ -17,7 +17,7 @@ """DolphinScheduler Task and TaskRelation object.""" -import logging +from logging import getLogger from typing import Dict, List, Optional, Sequence, Set, Tuple, Union from pydolphinscheduler.constants import ( @@ -34,6 +34,8 @@ from pydolphinscheduler.core.process_definition import ( ) from pydolphinscheduler.java_gateway import launch_gateway +logger = getLogger(__name__) + class TaskRelation(Base): """TaskRelation object, describe the relation of exactly two tasks.""" @@ -146,7 +148,7 @@ class Task(Base): ): self.process_definition.add_task(self) else: - logging.warning( + logger.warning( "Task code %d already in process definition, prohibit re-add task.", self.code, ) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py index 6af731b5ff7179e45706e4d4005f9075739f3bfa..7d4bbebdd33a2067281f0478d7ad776c571c25c4 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py @@ -16,13 +16,16 @@ # under the License. """Test Task class function.""" - +import logging +import re from unittest.mock import patch import pytest +from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.core.task import Task, TaskRelation from tests.testing.task import Task as testTask +from tests.testing.task import TaskWithCode TEST_TASK_RELATION_SET = set() TEST_TASK_RELATION_SIZE = 0 @@ -222,3 +225,19 @@ def test_tasks_list_shift(dep_expr: str, flag: str): assert all([1 == len(getattr(t, reverse_direction_attr)) for t in tasks]) assert all([task.code in getattr(t, reverse_direction_attr) for t in tasks]) + + +def test_add_duplicate(caplog): + """Test add task which code already in process definition.""" + with ProcessDefinition("test_add_duplicate_workflow") as _: + TaskWithCode(name="test_task_1", task_type="test", code=123, version=1) + with caplog.at_level(logging.WARNING): + TaskWithCode( + name="test_task_duplicate_code", task_type="test", code=123, version=2 + ) + assert all( + [ + caplog.text.startswith("WARNING pydolphinscheduler"), + re.findall("already in process definition", caplog.text), + ] + ) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/testing/task.py b/dolphinscheduler-python/pydolphinscheduler/tests/testing/task.py index e0affc9f851f39d3d4dfa6dc5f8457cf5e471edd..11ffbf1e6f390fb0576e4eb2413a7a722531144e 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/testing/task.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/testing/task.py @@ -30,3 +30,18 @@ class Task(SourceTask): def gen_code_and_version(self): """Mock java gateway code and version, convenience method for unittest.""" return uuid.uuid1().time, self.DEFAULT_VERSION + + +class TaskWithCode(SourceTask): + """Mock class :class:`pydolphinscheduler.core.task.Task` and it return some code and version.""" + + def __init__( + self, name: str, task_type: str, code: int, version: int, *args, **kwargs + ): + self._constant_code = code + self._constant_version = version + super().__init__(name, task_type, *args, **kwargs) + + def gen_code_and_version(self): + """Mock java gateway code and version, convenience method for unittest.""" + return self._constant_code, self._constant_version