diff --git a/apps/assets/models/node.py b/apps/assets/models/node.py index ad17a8be998bba304f5cc1986f705ebbbcf0e324..973df4b4ac876f1008aea6dde7c8fbf3f977428b 100644 --- a/apps/assets/models/node.py +++ b/apps/assets/models/node.py @@ -307,6 +307,15 @@ class NodeAllAssetsMappingMixin: org_id = str(org_id) cls.orgid_nodekey_assetsid_mapping.pop(org_id, None) + @classmethod + def expire_all_orgs_node_all_asset_ids_mapping_from_memory(cls): + orgs = Organization.objects.all() + org_ids = [str(org.id) for org in orgs] + org_ids.append(Organization.ROOT_ID) + + for id in org_ids: + cls.expire_node_all_asset_ids_mapping_from_memory(id) + # get order: from memory -> (from cache -> to generate) @classmethod def get_node_all_asset_ids_mapping_from_cache_or_generate_to_cache(cls, org_id): diff --git a/apps/assets/signals_handler/node_assets_mapping.py b/apps/assets/signals_handler/node_assets_mapping.py index 4e2b0d07b24bd6e581d52c9dae7e4bb9602a81f2..efa3cb29ff52b82fdf8e9d2d9e3faa01d2128a54 100644 --- a/apps/assets/signals_handler/node_assets_mapping.py +++ b/apps/assets/signals_handler/node_assets_mapping.py @@ -13,6 +13,7 @@ from common.signals import django_ready from common.utils.connection import RedisPubSub from common.utils import get_logger from assets.models import Asset, Node +from orgs.models import Organization logger = get_logger(__file__) @@ -36,13 +37,18 @@ node_assets_mapping_for_memory_pub_sub = NodeAssetsMappingForMemoryPubSub() def expire_node_assets_mapping_for_memory(org_id): # 所有进程清除(自己的 memory 数据) org_id = str(org_id) - node_assets_mapping_for_memory_pub_sub.publish(org_id) + root_org_id = Organization.ROOT_ID + # 当前进程清除(cache 数据) logger.debug( "Expire node assets id mapping from cache of org={}, pid={}" "".format(org_id, os.getpid()) ) Node.expire_node_all_asset_ids_mapping_from_cache(org_id) + Node.expire_node_all_asset_ids_mapping_from_cache(root_org_id) + + node_assets_mapping_for_memory_pub_sub.publish(org_id) + node_assets_mapping_for_memory_pub_sub.publish(root_org_id) @receiver(post_save, sender=Node) @@ -73,16 +79,22 @@ def subscribe_node_assets_mapping_expire(sender, **kwargs): logger.debug("Start subscribe for expire node assets id mapping from memory") def keep_subscribe(): - subscribe = node_assets_mapping_for_memory_pub_sub.subscribe() - for message in subscribe.listen(): - if message["type"] != "message": - continue - org_id = message['data'].decode() - Node.expire_node_all_asset_ids_mapping_from_memory(org_id) - logger.debug( - "Expire node assets id mapping from memory of org={}, pid={}" - "".format(str(org_id), os.getpid()) - ) + while True: + try: + subscribe = node_assets_mapping_for_memory_pub_sub.subscribe() + for message in subscribe.listen(): + if message["type"] != "message": + continue + org_id = message['data'].decode() + Node.expire_node_all_asset_ids_mapping_from_memory(org_id) + logger.debug( + "Expire node assets id mapping from memory of org={}, pid={}" + "".format(str(org_id), os.getpid()) + ) + except Exception as e: + logger.exception(f'subscribe_node_assets_mapping_expire: {e}') + Node.expire_all_orgs_node_all_asset_ids_mapping_from_memory() + t = threading.Thread(target=keep_subscribe) t.daemon = True t.start()