diff --git a/apps/common/cache.py b/apps/common/cache.py index 8cf76154c080acfaa3c5ef4d740a0edfbeca771b..0e2415691f269c57f5dbf87ddd73027e466a01ba 100644 --- a/apps/common/cache.py +++ b/apps/common/cache.py @@ -72,7 +72,7 @@ class Cache(metaclass=CacheBase): def get_data(self) -> dict: data = cache.get(self.key) - logger.debug(f'CACHE: get {self.key} = {data}') + logger.debug(f'Get data from cache: key={self.key} data={data}') if data is not None: data = json.loads(data) self._data = data @@ -81,7 +81,7 @@ class Cache(metaclass=CacheBase): def set_data(self, data): self._data = data to_json = json.dumps(data) - logger.info(f'CACHE: set {self.key} = {to_json}, timeout={self.timeout}') + logger.info(f'Set data to cache: key={self.key} data={to_json} timeout={self.timeout}') cache.set(self.key, to_json, timeout=self.timeout) def compute_data(self, *fields): @@ -122,6 +122,16 @@ class Cache(metaclass=CacheBase): self.set_data(data) return data + def expire_fields_with_lock(self, *fields): + with DistributedLock(name=f'{self.key}.refresh'): + data = self.get_data() + if data is not None: + logger.info(f'Expire cached fields: key={self.key} fields={fields}') + for f in fields: + data.pop(f) + self.set_data(data) + return data + def refresh(self, *fields): if not fields: # 没有指定 field 要刷新所有的值 @@ -146,10 +156,13 @@ class Cache(metaclass=CacheBase): def reload(self): self._data = None - def delete(self): - self._data = None - logger.info(f'CACHE: delete {self.key}') - cache.delete(self.key) + def expire(self, *fields): + if not fields: + self._data = None + logger.info(f'Delete cached key: key={self.key}') + cache.delete(self.key) + else: + self.expire_fields_with_lock(*fields) class CacheValueDesc: @@ -167,7 +180,8 @@ class CacheValueDesc: return self if self.field_name not in instance.data: instance.refresh(self.field_name) - value = instance.data[self.field_name] + # 防止边界情况没有值,报错 + value = instance.data.get(self.field_name) return value def compute_value(self, instance: Cache): @@ -183,5 +197,5 @@ class CacheValueDesc: new_value = compute_func() new_value = self.field_type.field_type(new_value) - logger.info(f'CACHE: compute {instance.key}.{self.field_name} = {new_value}') + logger.info(f'Compute cache field value: key={instance.key} field={self.field_name} value={new_value}') return new_value diff --git a/apps/common/utils/lock.py b/apps/common/utils/lock.py index 1a016d3d4bab134da6a2bb565ba0f54ee48512dd..ffb6217e36749d6dd0d9e3ade71743e4a1209491 100644 --- a/apps/common/utils/lock.py +++ b/apps/common/utils/lock.py @@ -158,6 +158,7 @@ class DistributedLock(RedisLock): def _release(self): try: self._release_redis_lock() + logger.debug(f'I[{self.id}] released lock[{self.name}]') except NotAcquired as e: logger.error(f'I[{self.id}] release lock[{self.name}] failed {e}') self._raise_exc(e) diff --git a/apps/orgs/cache.py b/apps/orgs/cache.py index 9b1c8ffa9abc9832856aecaeea841dcb7a668816..f0a9cb83eb7e69c1fb58490b3697df62a6b52846 100644 --- a/apps/orgs/cache.py +++ b/apps/orgs/cache.py @@ -32,3 +32,8 @@ class OrgRelatedCache(Cache): logger.info(f'CACHE: Send refresh task {self}.{fields}') refresh_org_cache_task.delay(self, *fields) on_commit(func) + + def expire(self, *fields): + def func(): + super(OrgRelatedCache, self).expire(*fields) + on_commit(func) diff --git a/apps/orgs/signals_handler.py b/apps/orgs/signals_handler.py index 57c7f54900b6d32feca4c2e54db17af4a1e3112b..7cf3a9c29a7ca5cb53c6e2f50dcb35d32193dc87 100644 --- a/apps/orgs/signals_handler.py +++ b/apps/orgs/signals_handler.py @@ -118,7 +118,7 @@ def refresh_user_amount_on_user_create_or_delete(user_id): orgs = Organization.objects.filter(m2m_org_members__user_id=user_id).distinct() for org in orgs: org_cache = OrgResourceStatisticsCache(org) - org_cache.refresh_async('users_amount') + org_cache.expire('users_amount') @receiver(post_save, sender=User) @@ -144,7 +144,7 @@ def on_org_user_changed_refresh_cache(sender, action, instance, reverse, pk_set, for org in orgs: org_cache = OrgResourceStatisticsCache(org) - org_cache.refresh_async('users_amount') + org_cache.expire('users_amount') class OrgResourceStatisticsRefreshUtil: @@ -166,7 +166,7 @@ class OrgResourceStatisticsRefreshUtil: cache_field_name = cls.model_cache_field_mapper.get(type(instance)) if cache_field_name: org_cache = OrgResourceStatisticsCache(instance.org) - org_cache.refresh_async(cache_field_name) + org_cache.expire(cache_field_name) @receiver(post_save)