提交 682f6b2f 编写于 作者: X xinwen 提交者: baltery

fix: 资产节点关系变化时也要清空 root 组织的 node_assets_mapping

上级 a2e39799
...@@ -307,6 +307,15 @@ class NodeAllAssetsMappingMixin: ...@@ -307,6 +307,15 @@ class NodeAllAssetsMappingMixin:
org_id = str(org_id) org_id = str(org_id)
cls.orgid_nodekey_assetsid_mapping.pop(org_id, None) cls.orgid_nodekey_assetsid_mapping.pop(org_id, None)
@classmethod
def expire_all_orgs_node_all_asset_ids_mapping_from_memory(cls):
orgs = Organization.objects.all()
org_ids = [str(org.id) for org in orgs]
org_ids.append(Organization.ROOT_ID)
for id in org_ids:
cls.expire_node_all_asset_ids_mapping_from_memory(id)
# get order: from memory -> (from cache -> to generate) # get order: from memory -> (from cache -> to generate)
@classmethod @classmethod
def get_node_all_asset_ids_mapping_from_cache_or_generate_to_cache(cls, org_id): def get_node_all_asset_ids_mapping_from_cache_or_generate_to_cache(cls, org_id):
......
...@@ -13,6 +13,7 @@ from common.signals import django_ready ...@@ -13,6 +13,7 @@ from common.signals import django_ready
from common.utils.connection import RedisPubSub from common.utils.connection import RedisPubSub
from common.utils import get_logger from common.utils import get_logger
from assets.models import Asset, Node from assets.models import Asset, Node
from orgs.models import Organization
logger = get_logger(__file__) logger = get_logger(__file__)
...@@ -36,13 +37,18 @@ node_assets_mapping_for_memory_pub_sub = NodeAssetsMappingForMemoryPubSub() ...@@ -36,13 +37,18 @@ node_assets_mapping_for_memory_pub_sub = NodeAssetsMappingForMemoryPubSub()
def expire_node_assets_mapping_for_memory(org_id): def expire_node_assets_mapping_for_memory(org_id):
# 所有进程清除(自己的 memory 数据) # 所有进程清除(自己的 memory 数据)
org_id = str(org_id) org_id = str(org_id)
node_assets_mapping_for_memory_pub_sub.publish(org_id) root_org_id = Organization.ROOT_ID
# 当前进程清除(cache 数据) # 当前进程清除(cache 数据)
logger.debug( logger.debug(
"Expire node assets id mapping from cache of org={}, pid={}" "Expire node assets id mapping from cache of org={}, pid={}"
"".format(org_id, os.getpid()) "".format(org_id, os.getpid())
) )
Node.expire_node_all_asset_ids_mapping_from_cache(org_id) Node.expire_node_all_asset_ids_mapping_from_cache(org_id)
Node.expire_node_all_asset_ids_mapping_from_cache(root_org_id)
node_assets_mapping_for_memory_pub_sub.publish(org_id)
node_assets_mapping_for_memory_pub_sub.publish(root_org_id)
@receiver(post_save, sender=Node) @receiver(post_save, sender=Node)
...@@ -73,16 +79,22 @@ def subscribe_node_assets_mapping_expire(sender, **kwargs): ...@@ -73,16 +79,22 @@ def subscribe_node_assets_mapping_expire(sender, **kwargs):
logger.debug("Start subscribe for expire node assets id mapping from memory") logger.debug("Start subscribe for expire node assets id mapping from memory")
def keep_subscribe(): def keep_subscribe():
subscribe = node_assets_mapping_for_memory_pub_sub.subscribe() while True:
for message in subscribe.listen(): try:
if message["type"] != "message": subscribe = node_assets_mapping_for_memory_pub_sub.subscribe()
continue for message in subscribe.listen():
org_id = message['data'].decode() if message["type"] != "message":
Node.expire_node_all_asset_ids_mapping_from_memory(org_id) continue
logger.debug( org_id = message['data'].decode()
"Expire node assets id mapping from memory of org={}, pid={}" Node.expire_node_all_asset_ids_mapping_from_memory(org_id)
"".format(str(org_id), os.getpid()) logger.debug(
) "Expire node assets id mapping from memory of org={}, pid={}"
"".format(str(org_id), os.getpid())
)
except Exception as e:
logger.exception(f'subscribe_node_assets_mapping_expire: {e}')
Node.expire_all_orgs_node_all_asset_ids_mapping_from_memory()
t = threading.Thread(target=keep_subscribe) t = threading.Thread(target=keep_subscribe)
t.daemon = True t.daemon = True
t.start() t.start()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册