From 682f6b2fb9a29bdeb51a421d494c8ad641a92729 Mon Sep 17 00:00:00 2001 From: xinwen Date: Mon, 12 Apr 2021 16:35:03 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E8=B5=84=E4=BA=A7=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E5=85=B3=E7=B3=BB=E5=8F=98=E5=8C=96=E6=97=B6=E4=B9=9F=E8=A6=81?= =?UTF-8?q?=E6=B8=85=E7=A9=BA=20root=20=E7=BB=84=E7=BB=87=E7=9A=84=20node?= =?UTF-8?q?=5Fassets=5Fmapping?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/assets/models/node.py | 9 +++++ .../signals_handler/node_assets_mapping.py | 34 +++++++++++++------ 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/apps/assets/models/node.py b/apps/assets/models/node.py index ad17a8be9..973df4b4a 100644 --- a/apps/assets/models/node.py +++ b/apps/assets/models/node.py @@ -307,6 +307,15 @@ class NodeAllAssetsMappingMixin: org_id = str(org_id) cls.orgid_nodekey_assetsid_mapping.pop(org_id, None) + @classmethod + def expire_all_orgs_node_all_asset_ids_mapping_from_memory(cls): + orgs = Organization.objects.all() + org_ids = [str(org.id) for org in orgs] + org_ids.append(Organization.ROOT_ID) + + for id in org_ids: + cls.expire_node_all_asset_ids_mapping_from_memory(id) + # get order: from memory -> (from cache -> to generate) @classmethod def get_node_all_asset_ids_mapping_from_cache_or_generate_to_cache(cls, org_id): diff --git a/apps/assets/signals_handler/node_assets_mapping.py b/apps/assets/signals_handler/node_assets_mapping.py index 4e2b0d07b..efa3cb29f 100644 --- a/apps/assets/signals_handler/node_assets_mapping.py +++ b/apps/assets/signals_handler/node_assets_mapping.py @@ -13,6 +13,7 @@ from common.signals import django_ready from common.utils.connection import RedisPubSub from common.utils import get_logger from assets.models import Asset, Node +from orgs.models import Organization logger = get_logger(__file__) @@ -36,13 +37,18 @@ node_assets_mapping_for_memory_pub_sub = NodeAssetsMappingForMemoryPubSub() def expire_node_assets_mapping_for_memory(org_id): # 所有进程清除(自己的 memory 数据) org_id = str(org_id) - node_assets_mapping_for_memory_pub_sub.publish(org_id) + root_org_id = Organization.ROOT_ID + # 当前进程清除(cache 数据) logger.debug( "Expire node assets id mapping from cache of org={}, pid={}" "".format(org_id, os.getpid()) ) Node.expire_node_all_asset_ids_mapping_from_cache(org_id) + Node.expire_node_all_asset_ids_mapping_from_cache(root_org_id) + + node_assets_mapping_for_memory_pub_sub.publish(org_id) + node_assets_mapping_for_memory_pub_sub.publish(root_org_id) @receiver(post_save, sender=Node) @@ -73,16 +79,22 @@ def subscribe_node_assets_mapping_expire(sender, **kwargs): logger.debug("Start subscribe for expire node assets id mapping from memory") def keep_subscribe(): - subscribe = node_assets_mapping_for_memory_pub_sub.subscribe() - for message in subscribe.listen(): - if message["type"] != "message": - continue - org_id = message['data'].decode() - Node.expire_node_all_asset_ids_mapping_from_memory(org_id) - logger.debug( - "Expire node assets id mapping from memory of org={}, pid={}" - "".format(str(org_id), os.getpid()) - ) + while True: + try: + subscribe = node_assets_mapping_for_memory_pub_sub.subscribe() + for message in subscribe.listen(): + if message["type"] != "message": + continue + org_id = message['data'].decode() + Node.expire_node_all_asset_ids_mapping_from_memory(org_id) + logger.debug( + "Expire node assets id mapping from memory of org={}, pid={}" + "".format(str(org_id), os.getpid()) + ) + except Exception as e: + logger.exception(f'subscribe_node_assets_mapping_expire: {e}') + Node.expire_all_orgs_node_all_asset_ids_mapping_from_memory() + t = threading.Thread(target=keep_subscribe) t.daemon = True t.start() -- GitLab