未验证 提交 b8f175e4 编写于 作者: F fit2bot 提交者: GitHub

perf(celery-task): 优化检查节点资产数量的 Celery 任务 (#5052)

Co-authored-by: Nxinwen <coderWen@126.com>
上级 a626ff5a
from celery import shared_task
from ops.celery.decorator import register_as_period_task
from assets.utils import check_node_assets_amount
from common.utils import get_logger
from common.utils.timezone import now
logger = get_logger(__file__)
@shared_task()
@register_as_period_task(crontab='* 2 * * *')
@shared_task(queue='celery_heavy_tasks')
def check_node_assets_amount_celery_task():
logger.info(f'>>> {now()} begin check_node_assets_amount_celery_task ...')
check_node_assets_amount()
logger.info(f'>>> {now()} end check_node_assets_amount_celery_task ...')
# ~*~ coding: utf-8 ~*~
#
import time
from django.db.models import Q
from common.utils import get_logger, dict_get_any, is_uuid, get_object_or_none
......@@ -12,15 +14,18 @@ logger = get_logger(__file__)
def check_node_assets_amount():
for node in Node.objects.all():
logger.info(f'Check node assets amount: {node}')
assets_amount = Asset.objects.filter(
Q(nodes__key__istartswith=f'{node.key}:') | Q(nodes=node)
).distinct().count()
if node.assets_amount != assets_amount:
print(f'>>> <Node:{node.key}> wrong assets amount '
f'{node.assets_amount} right is {assets_amount}')
logger.warn(f'Node wrong assets amount <Node:{node.key}> '
f'{node.assets_amount} right is {assets_amount}')
node.assets_amount = assets_amount
node.save()
# 防止自检程序给数据库的压力太大
time.sleep(0.1)
def is_asset_exists_in_node(asset_pk, node_key):
......
......@@ -29,16 +29,3 @@ configs["CELERY_ROUTES"] = {
app.namespace = 'CELERY'
app.conf.update(configs)
app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS])
app.conf.beat_schedule = {
'check-asset-permission-expired': {
'task': 'perms.tasks.check_asset_permission_expired',
'schedule': settings.PERM_EXPIRED_CHECK_PERIODIC,
'args': ()
},
'check-node-assets-amount': {
'task': 'assets.tasks.nodes_amount.check_node_assets_amount_celery_task',
'schedule': crontab(minute=0, hour=0),
'args': ()
},
}
......@@ -5,10 +5,12 @@ from datetime import timedelta
from django.db import transaction
from django.db.models import Q
from django.db.transaction import atomic
from django.conf import settings
from celery import shared_task
from common.utils import get_logger
from common.utils.timezone import now, dt_formater, dt_parser
from users.models import User
from ops.celery.decorator import register_as_period_task
from assets.models import Node
from perms.models import RebuildUserTreeTask, AssetPermission
from perms.utils.asset.user_permission import rebuild_user_mapping_nodes_if_need_with_lock, lock
......@@ -33,7 +35,8 @@ def dispatch_mapping_node_tasks():
rebuild_user_mapping_nodes_celery_task.delay(id)
@shared_task(queue='check_asset_perm_expired')
@register_as_period_task(interval=settings.PERM_EXPIRED_CHECK_PERIODIC)
@shared_task(queue='celery_check_asset_perm_expired')
@atomic()
def check_asset_permission_expired():
"""
......
......@@ -156,7 +156,10 @@ def is_running(s, unlink=True):
def parse_service(s):
web_services = ['gunicorn', 'flower', 'daphne']
celery_services = ["celery_ansible", "celery_default", "celery_node_tree", "check_asset_perm_expired"]
celery_services = [
"celery_ansible", "celery_default", "celery_node_tree",
"celery_check_asset_perm_expired", "celery_heavy_tasks"
]
task_services = celery_services + ['beat']
all_services = web_services + task_services
if s == 'all':
......@@ -225,9 +228,14 @@ def get_start_celery_node_tree_kwargs():
return get_start_worker_kwargs('node_tree', 2)
def get_start_celery_heavy_tasks_kwargs():
print("\n- Start Celery as Distributed Task Queue: HeavyTasks")
return get_start_worker_kwargs('celery_heavy_tasks', 1)
def get_start_celery_check_asset_perm_expired_kwargs():
print("\n- Start Celery as Distributed Task Queue: CheckAseetPermissionExpired")
return get_start_worker_kwargs('check_asset_perm_expired', 1)
return get_start_worker_kwargs('celery_check_asset_perm_expired', 1)
def get_start_worker_kwargs(queue, num):
......@@ -366,7 +374,8 @@ def start_service(s):
"celery_ansible": get_start_celery_ansible_kwargs,
"celery_default": get_start_celery_default_kwargs,
"celery_node_tree": get_start_celery_node_tree_kwargs,
"check_asset_perm_expired": get_start_celery_check_asset_perm_expired_kwargs,
"celery_heavy_tasks": get_start_celery_heavy_tasks_kwargs,
"celery_check_asset_perm_expired": get_start_celery_check_asset_perm_expired_kwargs,
"beat": get_start_beat_kwargs,
"flower": get_start_flower_kwargs,
"daphne": get_start_daphne_kwargs,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册