提交 f5fd4097 编写于 作者: X xinwen

fix: 修复多个 AdHocExecution 在一个 celery task 执行时日志错误

上级 31886926
from django.utils.translation import ugettext_lazy as _
from common.db.models import ChoiceSet
ADMIN = 'Admin'
USER = 'User'
......
......@@ -125,3 +125,5 @@ CELERY_WORKER_REDIRECT_STDOUTS_LEVEL = "INFO"
# CELERY_WORKER_HIJACK_ROOT_LOGGER = True
# CELERY_WORKER_MAX_TASKS_PER_CHILD = 40
CELERY_TASK_SOFT_TIME_LIMIT = 3600
ANSIBLE_LOG_DIR = os.path.join(PROJECT_DIR, 'data', 'ansible')
......@@ -60,6 +60,10 @@ class CallbackMixin:
self.results_raw[t][host][task_name] = task_result
self.clean_result(t, host, task_name, task_result)
def close(self):
if hasattr(self._display, 'close'):
self._display.close()
class AdHocResultCallback(CallbackMixin, CallbackModule, CMDCallBackModule):
"""
......
import errno
import sys
import os
from ansible.utils.display import Display
from ansible.utils.color import stringc
from ansible.utils.singleton import Singleton
from .utils import get_ansible_task_log_path
class UnSingleton(Singleton):
def __init__(cls, name, bases, dct):
type.__init__(cls, name, bases, dct)
def __call__(cls, *args, **kwargs):
return type.__call__(cls, *args, **kwargs)
class AdHocDisplay(Display, metaclass=UnSingleton):
def __init__(self, execution_id, verbosity=0):
super().__init__(verbosity=verbosity)
if execution_id:
log_path = get_ansible_task_log_path(execution_id)
else:
log_path = os.devnull
self.log_file = open(log_path, mode='a')
def close(self):
self.log_file.close()
def set_cowsay_info(self):
# 中断 cowsay 的测试,会频繁开启子进程
return
def _write_to_screen(self, msg, stderr):
if not stderr:
screen = sys.stdout
else:
screen = sys.stderr
screen.write(msg)
try:
screen.flush()
except IOError as e:
# Ignore EPIPE in case fileobj has been prematurely closed, eg.
# when piping to "head -n1"
if e.errno != errno.EPIPE:
raise
def _write_to_log_file(self, msg):
# 这里先不 flush,log 文件不需要那么及时。
self.log_file.write(msg)
def display(self, msg, color=None, stderr=False, screen_only=False, log_only=False):
if color:
msg = stringc(msg, color)
if not msg.endswith(u'\n'):
msg2 = msg + u'\n'
else:
msg2 = msg
self._write_to_screen(msg2, stderr)
self._write_to_log_file(msg2)
# ~*~ coding: utf-8 ~*~
import os
import shutil
from collections import namedtuple
......@@ -18,6 +19,7 @@ from .callback import (
)
from common.utils import get_logger
from .exceptions import AnsibleError
from .display import AdHocDisplay
__all__ = ["AdHocRunner", "PlayBookRunner", "CommandRunner"]
......@@ -130,8 +132,8 @@ class AdHocRunner:
loader=self.loader, inventory=self.inventory
)
def get_result_callback(self, file_obj=None):
return self.__class__.results_callback_class()
def get_result_callback(self, execution_id=None):
return self.__class__.results_callback_class(display=AdHocDisplay(execution_id))
@staticmethod
def check_module_args(module_name, module_args=''):
......@@ -189,7 +191,7 @@ class AdHocRunner:
'ssh_args': '-C -o ControlMaster=no'
}
def run(self, tasks, pattern, play_name='Ansible Ad-hoc', gather_facts='no'):
def run(self, tasks, pattern, play_name='Ansible Ad-hoc', gather_facts='no', execution_id=None):
"""
:param tasks: [{'action': {'module': 'shell', 'args': 'ls'}, ...}, ]
:param pattern: all, *, or others
......@@ -198,7 +200,7 @@ class AdHocRunner:
:return:
"""
self.check_pattern(pattern)
self.results_callback = self.get_result_callback()
self.results_callback = self.get_result_callback(execution_id)
cleaned_tasks = self.clean_tasks(tasks)
self.set_control_master_if_need(cleaned_tasks)
context.CLIARGS = ImmutableDict(self.options)
......@@ -233,6 +235,8 @@ class AdHocRunner:
tqm.cleanup()
shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
self.results_callback.close()
class CommandRunner(AdHocRunner):
results_callback_class = CommandResultCallback
......
from django.conf import settings
def get_ansible_task_log_path(task_id):
from ops.utils import get_task_log_path
return get_task_log_path(settings.ANSIBLE_LOG_DIR, task_id, level=3)
......@@ -15,10 +15,14 @@ from common.api import LogTailApi
from ..models import CeleryTask
from ..serializers import CeleryResultSerializer, CeleryPeriodTaskSerializer
from ..celery.utils import get_celery_task_log_path
from ..ansible.utils import get_ansible_task_log_path
from common.mixins.api import CommonApiMixin
__all__ = ['CeleryTaskLogApi', 'CeleryResultApi', 'CeleryPeriodTaskViewSet']
__all__ = [
'CeleryTaskLogApi', 'CeleryResultApi', 'CeleryPeriodTaskViewSet',
'AnsibleTaskLogApi',
]
class CeleryTaskLogApi(LogTailApi):
......@@ -57,6 +61,21 @@ class CeleryTaskLogApi(LogTailApi):
return _('Waiting task start')
class AnsibleTaskLogApi(LogTailApi):
permission_classes = (IsValidUser,)
def get_log_path(self):
new_path = get_ansible_task_log_path(self.kwargs.get('pk'))
if new_path and os.path.isfile(new_path):
return new_path
def get_no_file_message(self, request):
if self.mark == 'undefined':
return '.'
else:
return _('Waiting task start')
class CeleryResultApi(generics.RetrieveAPIView):
permission_classes = (IsValidUser,)
serializer_class = CeleryResultSerializer
......
......@@ -102,11 +102,8 @@ def get_celery_periodic_task(task_name):
def get_celery_task_log_path(task_id):
task_id = str(task_id)
rel_path = os.path.join(task_id[0], task_id[1], task_id + '.log')
path = os.path.join(settings.CELERY_LOG_DIR, rel_path)
os.makedirs(os.path.dirname(path), exist_ok=True)
return path
from ops.utils import get_task_log_path
return get_task_log_path(settings.CELERY_LOG_DIR, task_id)
def get_celery_status():
......
# Generated by Django 3.1 on 2020-12-30 12:04
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ops', '0018_auto_20200509_1434'),
]
operations = [
migrations.AddField(
model_name='adhocexecution',
name='celery_task_id',
field=models.UUIDField(default=None, null=True),
),
]
......@@ -179,13 +179,13 @@ class AdHoc(OrgModelMixin):
def run(self):
try:
hid = current_task.request.id
if AdHocExecution.objects.filter(id=hid).exists():
hid = uuid.uuid4()
celery_task_id = current_task.request.id
except AttributeError:
hid = uuid.uuid4()
celery_task_id = None
execution = AdHocExecution(
id=hid, adhoc=self, task=self.task,
celery_task_id=celery_task_id,
adhoc=self, task=self.task,
task_display=str(self.task)[:128],
date_start=timezone.now(),
hosts_amount=self.hosts.count(),
......@@ -237,6 +237,7 @@ class AdHocExecution(OrgModelMixin):
id = models.UUIDField(default=uuid.uuid4, primary_key=True)
task = models.ForeignKey(Task, related_name='execution', on_delete=models.SET_NULL, null=True)
task_display = models.CharField(max_length=128, blank=True, default='', verbose_name=_("Task display"))
celery_task_id = models.UUIDField(default=None, null=True)
hosts_amount = models.IntegerField(default=0, verbose_name=_("Host amount"))
adhoc = models.ForeignKey(AdHoc, related_name='execution', on_delete=models.SET_NULL, null=True)
date_start = models.DateTimeField(auto_now_add=True, verbose_name=_('Start time'))
......@@ -270,6 +271,7 @@ class AdHocExecution(OrgModelMixin):
self.adhoc.tasks,
self.adhoc.pattern,
self.task.name,
execution_id=self.id
)
return result.results_raw, result.results_summary
except AnsibleError as e:
......
......@@ -22,6 +22,8 @@ urlpatterns = [
path('tasks/<uuid:pk>/run/', api.TaskRun.as_view(), name='task-run'),
path('celery/task/<uuid:pk>/log/', api.CeleryTaskLogApi.as_view(), name='celery-task-log'),
path('celery/task/<uuid:pk>/result/', api.CeleryResultApi.as_view(), name='celery-result'),
path('ansible/task/<uuid:pk>/log/', api.AnsibleTaskLogApi.as_view(), name='ansible-task-log'),
]
urlpatterns += router.urls
......
......@@ -5,5 +5,5 @@ from .. import ws
app_name = 'ops'
urlpatterns = [
path('ws/ops/tasks/log/', ws.CeleryLogWebsocket, name='task-log-ws'),
path('ws/ops/tasks/log/', ws.TaskLogWebsocket, name='task-log-ws'),
]
# ~*~ coding: utf-8 ~*~
import os
from django.utils.translation import ugettext_lazy as _
from common.utils import get_logger, get_object_or_none
......@@ -75,3 +77,10 @@ def send_server_performance_mail(path, usage, usages):
send_mail_async(subject, message, recipient_list, html_message=message)
def get_task_log_path(base_path, task_id, level=2):
task_id = str(task_id)
rel_path = os.path.join(*task_id[:level], task_id + '.log')
path = os.path.join(base_path, rel_path)
os.makedirs(os.path.dirname(path), exist_ok=True)
return path
......@@ -6,25 +6,37 @@ import json
from common.utils import get_logger
from .celery.utils import get_celery_task_log_path
from .ansible.utils import get_ansible_task_log_path
from channels.generic.websocket import JsonWebsocketConsumer
logger = get_logger(__name__)
class CeleryLogWebsocket(JsonWebsocketConsumer):
class TaskLogWebsocket(JsonWebsocketConsumer):
disconnected = False
log_types = {
'celery': get_celery_task_log_path,
'ansible': get_ansible_task_log_path
}
def connect(self):
self.accept()
def get_log_path(self, task_id):
func = self.log_types.get(self.log_type)
if func:
return func(task_id)
def receive(self, text_data=None, bytes_data=None, **kwargs):
data = json.loads(text_data)
task_id = data.get("task")
task_id = data.get('task')
self.log_type = data.get('type', 'celery')
if task_id:
self.handle_task(task_id)
def wait_util_log_path_exist(self, task_id):
log_path = get_celery_task_log_path(task_id)
log_path = self.get_log_path(task_id)
while not self.disconnected:
if not os.path.exists(log_path):
self.send_json({'message': '.', 'task': task_id})
......@@ -70,5 +82,3 @@ class CeleryLogWebsocket(JsonWebsocketConsumer):
def disconnect(self, close_code):
self.disconnected = True
self.close()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册