tcache.c 27.8 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
S
cache  
Shengliang Guan 已提交
17
#include "tcache.h"
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
H
hzcheng 已提交
20 21
#include "tutil.h"

22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
static pthread_t       cacheRefreshWorker = {0};
static pthread_once_t  cacheThreadInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER;
static SArray         *pCacheArrayList = NULL;
static bool            stopRefreshWorker = false;
static bool            refreshWorkerNormalStopped = false;
static bool            refreshWorkerUnexpectedStopped = false;

typedef struct SCacheNode {
  uint64_t           addedTime;   // the added time when this element is added or updated into cache
  uint64_t           lifespan;    // life duration when this element should be remove from cache
  int64_t            expireTime;  // expire time
  uint64_t           signature;
  struct STrashElem *pTNodeHeader;    // point to trash node head
  uint16_t           keyLen: 15;    // max key size: 32kb
  bool               inTrashcan : 1;  // denote if it is in trash or not
  uint32_t           size;            // allocated size for current SCacheNode
  uint32_t           dataLen;
  T_REF_DECLARE()
  struct SCacheNode *pNext;
  char              *key;
  char              *data;
} SCacheNode;

typedef struct SCacheEntry {
  int32_t     num;      // number of elements in current entry
  SRWLatch    latch;    // entry latch
  SCacheNode *next;
} SCacheEntry;

typedef struct STrashElem {
  struct STrashElem *prev;
  struct STrashElem *next;
  SCacheNode        *pData;
} STrashElem;

/*
 * to accommodate the old data which has the same key value of new one in hashList
 * when an new node is put into cache, if an existed one with the same key:
 * 1. if the old one does not be referenced, update it.
 * 2. otherwise, move the old one to pTrash, addedTime the new one.
 *
 * when the node in pTrash does not be referenced, it will be release at the expired expiredTime
 */
struct SCacheObj {
  int64_t            sizeInBytes;  // total allocated buffer in this hash table, SCacheObj is not included.
  int64_t            refreshTime;
  char              *name;
  SCacheStatis       statistics;

  SCacheEntry       *pEntryList;
  size_t             capacity;     // number of slots
  size_t             numOfElems;         // number of elements in cache
  _hash_fn_t         hashFp;       // hash function
  __cache_free_fn_t  freeFp;

  uint32_t           numOfElemsInTrash;  // number of element in trash
  STrashElem        *pTrash;

  uint8_t            deleting;           // set the deleting flag to stop refreshing ASAP.
  pthread_t          refreshWorker;
  bool               extendLifespan;  // auto extend life span when one item is accessed.
  int64_t            checkTick;       // tick used to record the check times of the refresh threads
#if defined(LINUX)
  pthread_rwlock_t lock;
#else
  pthread_mutex_t lock;
#endif
};

typedef struct SCacheObjTravSup {
  SCacheObj        *pCacheObj;
  int64_t           time;
  __cache_trav_fn_t fp;
  void             *param1;
} SCacheObjTravSup;

static FORCE_INLINE void __trashcan_wr_lock(SCacheObj *pCacheObj) {
100
#if defined(LINUX)
101
  pthread_rwlock_wrlock(&pCacheObj->lock);
102
#else
103
  pthread_mutex_lock(&pCacheObj->lock);
104 105 106
#endif
}

107
static FORCE_INLINE void __trashcan_unlock(SCacheObj *pCacheObj) {
108
#if defined(LINUX)
109
  pthread_rwlock_unlock(&pCacheObj->lock);
110
#else
111
  pthread_mutex_unlock(&pCacheObj->lock);
112 113 114
#endif
}

115
static FORCE_INLINE int32_t __trashcan_lock_init(SCacheObj *pCacheObj) {
116
#if defined(LINUX)
117
  return pthread_rwlock_init(&pCacheObj->lock, NULL);
118
#else
119
  return pthread_mutex_init(&pCacheObj->lock, NULL);
120 121 122
#endif
}

123
static FORCE_INLINE void __trashcan_lock_destroy(SCacheObj *pCacheObj) {
124
#if defined(LINUX)
125
  pthread_rwlock_destroy(&pCacheObj->lock);
126
#else
127
  pthread_mutex_destroy(&pCacheObj->lock);
128 129 130
#endif
}

131 132 133 134 135 136 137 138 139 140
/**
 * do cleanup the taos cache
 * @param pCacheObj
 */
static void doCleanupDataCache(SCacheObj *pCacheObj);

/**
 * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
 * @param handle   Cache object handle
 */
S
cache  
Shengliang Guan 已提交
141
static void *taosCacheTimedRefresh(void *handle);
142

H
Haojun Liao 已提交
143
static void doInitRefreshThread(void) {
144 145 146 147 148 149 150 151 152 153
  pCacheArrayList = taosArrayInit(4, POINTER_BYTES);

  pthread_attr_t thattr;
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

  pthread_create(&cacheRefreshWorker, &thattr, taosCacheTimedRefresh, NULL);
  pthread_attr_destroy(&thattr);
}

S
cache  
Shengliang Guan 已提交
154
pthread_t doRegisterCacheObj(SCacheObj *pCacheObj) {
155 156 157 158 159 160 161 162 163
  pthread_once(&cacheThreadInit, doInitRefreshThread);

  pthread_mutex_lock(&guard);
  taosArrayPush(pCacheArrayList, &pCacheObj);
  pthread_mutex_unlock(&guard);

  return cacheRefreshWorker;
}

H
hzcheng 已提交
164 165 166 167 168 169
/**
 * @param key      key of object for hash, usually a null-terminated string
 * @param keyLen   length of key
 * @param pData    actually data. required a consecutive memory block, no pointer is allowed
 *                 in pData. Pointer copy causes memory access error.
 * @param size     size of block
170
 * @param lifespan total survial expiredTime from now
171
 * @return         SCacheNode
H
hzcheng 已提交
172
 */
173
static SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
S
cache  
Shengliang Guan 已提交
174
                                           uint64_t duration);
H
hzcheng 已提交
175 176

/**
177
 * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
H
hzcheng 已提交
178
 * It will be removed until the pNode->refCount == 0
179
 * @param pCacheObj    Cache object
H
hzcheng 已提交
180 181
 * @param pNode   Cache slot object
 */
182
static void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheNode *pNode);
183

H
hzcheng 已提交
184 185 186
/**
 * remove nodes in trash with refCount == 0 in cache
 * @param pNode
187
 * @param pCacheObj
H
hzcheng 已提交
188 189 190
 * @param force   force model, if true, remove data in trash without check refcount.
 *                may cause corruption. So, forece model only applys before cache is closed
 */
H
Haojun Liao 已提交
191
static void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force);
H
hzcheng 已提交
192 193 194

/**
 * release node
195
 * @param pCacheObj      cache object
H
Haojun Liao 已提交
196
 * @param pNode          data node
H
hzcheng 已提交
197
 */
198
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheNode *pNode) {
H
hzcheng 已提交
199
  if (pNode->signature != (uint64_t)pNode) {
S
slguan 已提交
200
    uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
H
hzcheng 已提交
201 202
    return;
  }
203

204
  atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
H
Haojun Liao 已提交
205

H
Haojun Liao 已提交
206
  uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, total num:%d size:%" PRId64 "bytes",
207
         pCacheObj->name, pNode->key, pNode->data, pNode->size, (int)pCacheObj->numOfElems - 1, pCacheObj->sizeInBytes);
H
Haojun Liao 已提交
208 209 210 211

  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pNode->data);
  }
212

H
hzcheng 已提交
213 214 215
  free(pNode);
}

S
cache  
Shengliang Guan 已提交
216 217
static FORCE_INLINE STrashElem *doRemoveElemInTrashcan(SCacheObj *pCacheObj, STrashElem *pElem) {
  if (pElem->pData->signature != (uint64_t)pElem->pData) {
H
Haojun Liao 已提交
218
    uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
H
Haojun Liao 已提交
219
    return NULL;
H
Haojun Liao 已提交
220 221
  }

S
cache  
Shengliang Guan 已提交
222
  STrashElem *next = pElem->next;
H
Haojun Liao 已提交
223

H
Haojun Liao 已提交
224 225 226
  pCacheObj->numOfElemsInTrash--;
  if (pElem->prev) {
    pElem->prev->next = pElem->next;
S
cache  
Shengliang Guan 已提交
227
  } else {  // pnode is the header, update header
H
Haojun Liao 已提交
228 229 230
    pCacheObj->pTrash = pElem->next;
  }

H
Haojun Liao 已提交
231 232
  if (next) {
    next->prev = pElem->prev;
H
Haojun Liao 已提交
233
  }
H
Haojun Liao 已提交
234 235 236 237 238 239

  if (pCacheObj->numOfElemsInTrash == 0) {
    assert(pCacheObj->pTrash == NULL);
  }

  return next;
H
Haojun Liao 已提交
240 241
}

S
cache  
Shengliang Guan 已提交
242
static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj *pCacheObj, STrashElem *pElem) {
H
Haojun Liao 已提交
243 244 245 246 247 248 249
  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pElem->pData->data);
  }

  free(pElem->pData);
  free(pElem);
}
H
hzcheng 已提交
250

251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
static void pushfrontNodeInEntryList(SCacheEntry *pEntry, SCacheNode *pNode) {
  assert(pNode != NULL && pEntry != NULL);

  pNode->pNext = pEntry->next;
  pEntry->next = pNode;
  pEntry->num += 1;
}

static void removeNodeInEntryList(SCacheEntry* pe, SCacheNode* prev, SCacheNode* pNode) {
  if (prev == NULL) {
    ASSERT(pe->next == pNode);
    pe->next = pNode->pNext;
  } else {
    prev->pNext = pNode->pNext;
  }

  pe->num -= 1;
}

static FORCE_INLINE SCacheEntry* doFindEntry(SCacheObj* pCacheObj, const void* key, size_t keyLen) {
  uint32_t hashVal = (*pCacheObj->hashFp)(key, keyLen);
  int32_t  slot = hashVal % pCacheObj->capacity;
  return &pCacheObj->pEntryList[slot];
}

static FORCE_INLINE SCacheNode *
doSearchInEntryList(SCacheEntry *pe, const void *key, size_t keyLen, SCacheNode** prev) {
  SCacheNode *pNode = pe->next;
  while (pNode) {
    if ((pNode->keyLen == keyLen) && memcmp(pNode->key, key, keyLen) == 0) {
      break;
    }
    *prev = pNode;
    pNode = pNode->pNext;
  }

  return pNode;
}

S
cache  
Shengliang Guan 已提交
290 291 292
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn,
                         const char *cacheName) {
  const int32_t SLEEP_DURATION = 500;  // 500 ms
293

H
Haojun Liao 已提交
294
  if (refreshTimeInSeconds <= 0) {
H
hzcheng 已提交
295 296
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
297

298 299
  SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
  if (pCacheObj == NULL) {
S
slguan 已提交
300
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
301 302
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
303

304 305
  pCacheObj->pEntryList = calloc(4096, sizeof(SCacheEntry));
  if (pCacheObj->pEntryList == NULL) {
306
    free(pCacheObj);
S
slguan 已提交
307
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
308 309
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
310

H
Haojun Liao 已提交
311
  // set free cache node callback function
312 313 314 315 316
  pCacheObj->capacity       = 4096;  // todo refactor
  pCacheObj->hashFp         = taosGetDefaultHashFunction(keyType);
  pCacheObj->freeFp         = fn;
  pCacheObj->refreshTime    = refreshTimeInSeconds * 1000;
  pCacheObj->checkTick      = pCacheObj->refreshTime / SLEEP_DURATION;
317
  pCacheObj->extendLifespan = extendLifespan;  // the TTL after the last access
318

319 320
  if (__trashcan_lock_init(pCacheObj) != 0) {
    tfree(pCacheObj->pEntryList);
321
    free(pCacheObj);
S
cache  
Shengliang Guan 已提交
322

S
slguan 已提交
323
    uError("failed to init lock, reason:%s", strerror(errno));
H
hzcheng 已提交
324 325
    return NULL;
  }
326

327
  pCacheObj->name = strdup(cacheName);
328
  doRegisterCacheObj(pCacheObj);
329
  return pCacheObj;
330
}
H
hzcheng 已提交
331

S
cache  
Shengliang Guan 已提交
332 333
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize,
                   int32_t durationMS) {
334
  if (pCacheObj == NULL || pCacheObj->pEntryList == NULL || pCacheObj->deleting == 1) {
335 336
    return NULL;
  }
H
Haojun Liao 已提交
337

338
  SCacheNode *pNode1 = taosCreateCacheNode(key, keyLen, pData, dataSize, durationMS);
H
Haojun Liao 已提交
339 340 341 342 343
  if (pNode1 == NULL) {
    uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
    return NULL;
  }

H
Haojun Liao 已提交
344
  T_REF_INC(pNode1);
H
Haojun Liao 已提交
345

346
  SCacheEntry *pe = doFindEntry(pCacheObj, key, keyLen);
347

348
  taosWLockLatch(&pe->latch);
349

350 351
  SCacheNode *prev  = NULL;
  SCacheNode* pNode = doSearchInEntryList(pe, key, keyLen, &prev);
H
Haojun Liao 已提交
352

353 354 355 356 357 358 359 360 361 362 363
  if (pNode == NULL) {
    pushfrontNodeInEntryList(pe, pNode1);
    atomic_add_fetch_64(&pCacheObj->numOfElems, 1);
    atomic_add_fetch_64(&pCacheObj->sizeInBytes, pNode1->size);
    uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
           ", totalNum:%d sizeInBytes:%" PRId64 "bytes size:%" PRId64 "bytes",
           pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime, (int32_t)pCacheObj->numOfElems,
           pCacheObj->sizeInBytes, (int64_t)dataSize);
  } else {  // duplicated key exists
    // move current node to trashcan
    removeNodeInEntryList(pe, prev, pNode);
H
Haojun Liao 已提交
364

365 366 367
    if (T_REF_VAL_GET(pNode) == 0) {
      if (pCacheObj->freeFp) {
        pCacheObj->freeFp(pNode->data);
H
Haojun Liao 已提交
368
      }
369 370 371 372 373 374

      atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
      tfree(pNode);
    } else {
      taosAddToTrashcan(pCacheObj, pNode);
      uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, pNode->data);
H
Haojun Liao 已提交
375
    }
376 377 378 379 380 381 382

    pushfrontNodeInEntryList(pe, pNode1);
    atomic_add_fetch_64(&pCacheObj->sizeInBytes, pNode1->size);
    uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
           ", totalNum:%d sizeInBytes:%" PRId64 "bytes size:%" PRId64 "bytes",
           pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime, (int32_t)pCacheObj->numOfElems,
           pCacheObj->sizeInBytes, (int64_t)dataSize);
H
Haojun Liao 已提交
383 384
  }

385
  taosWUnLockLatch(&pe->latch);
H
Haojun Liao 已提交
386
  return pNode1->data;
H
hzcheng 已提交
387 388
}

H
Haojun Liao 已提交
389
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
390 391 392 393
  if (pCacheObj == NULL || pCacheObj->deleting == 1) {
    return NULL;
  }

394
  if (pCacheObj->numOfElems == 0) {
395
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
396
    return NULL;
397
  }
H
Haojun Liao 已提交
398

399 400 401 402
  SCacheNode *prev = NULL;
  SCacheEntry *pe = doFindEntry(pCacheObj, key, keyLen);

  taosRLockLatch(&pe->latch);
S
Shengliang Guan 已提交
403

404 405 406 407 408 409 410
  SCacheNode* pNode = doSearchInEntryList(pe, key, keyLen, &prev);
  if (pNode != NULL) {
    int32_t ref = T_REF_INC(pNode);
    ASSERT(ref > 0);
  }

  taosRUnLockLatch(&pe->latch);
S
Shengliang Guan 已提交
411

412
  void *pData = (pNode != NULL) ? pNode->data : NULL;
S
Shengliang Guan 已提交
413
  if (pData != NULL) {
414
    atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
S
cache  
Shengliang Guan 已提交
415
    uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData,
416
           T_REF_VAL_GET(pNode));
417
  } else {
418
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
H
Haojun Liao 已提交
419
    uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
420
  }
S
Shengliang Guan 已提交
421

422
  atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
S
Shengliang Guan 已提交
423
  return pData;
424 425
}

426 427
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
  if (pCacheObj == NULL || data == NULL) return NULL;
S
cache  
Shengliang Guan 已提交
428

429
  SCacheNode *ptNode = (SCacheNode *)((char *)data - sizeof(SCacheNode));
430
  if (ptNode->signature != (uint64_t)ptNode) {
H
Haojun Liao 已提交
431
    uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
432 433
    return NULL;
  }
434

435
  int32_t ref = T_REF_INC(ptNode);
H
Haojun Liao 已提交
436
  uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
H
Haojun Liao 已提交
437

438 439 440 441 442
  // the data if referenced by at least one object, so the reference count must be greater than the value of 2.
  assert(ref >= 2);
  return data;
}

443
void *taosCacheTransferData(SCacheObj *pCacheObj, void **data) {
H
Haojun Liao 已提交
444
  if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
S
cache  
Shengliang Guan 已提交
445

446
  SCacheNode *ptNode = (SCacheNode *)((char *)(*data) - sizeof(SCacheNode));
447
  if (ptNode->signature != (uint64_t)ptNode) {
H
Haojun Liao 已提交
448
    uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
449 450
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
451

452 453
  assert(T_REF_VAL_GET(ptNode) >= 1);
  char *d = *data;
S
cache  
Shengliang Guan 已提交
454

455 456 457
  // clear its reference to old area
  *data = NULL;
  return d;
H
hzcheng 已提交
458
}
459

460
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
H
Haojun Liao 已提交
461
  if (pCacheObj == NULL) {
H
Haojun Liao 已提交
462 463 464
    return;
  }

H
Haojun Liao 已提交
465
  if ((*data) == NULL) {
H
Haojun Liao 已提交
466
    uError("cache:%s, NULL data to release", pCacheObj->name);
467 468
    return;
  }
H
Haojun Liao 已提交
469 470 471 472

  // The operation of removal from hash table and addition to trashcan is not an atomic operation,
  // therefore the check for the empty of both the hash table and the trashcan has a race condition.
  // It happens when there is only one object in the cache, and two threads which has referenced this object
H
Haojun Liao 已提交
473
  // start to free the it simultaneously [TD-1569].
474
  SCacheNode *pNode = (SCacheNode *)((char *)(*data) - sizeof(SCacheNode));
475
  if (pNode->signature != (uint64_t)pNode) {
H
Haojun Liao 已提交
476
    uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode);
477 478
    return;
  }
479

480
  *data = NULL;
481

482
  // note: extend lifespan before dec ref count
H
Haojun Liao 已提交
483
  bool inTrashcan = pNode->inTrashcan;
H
Haojun Liao 已提交
484

H
Haojun Liao 已提交
485
  if (pCacheObj->extendLifespan && (!inTrashcan) && (!_remove)) {
486
    atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
S
cache  
Shengliang Guan 已提交
487
    uDebug("cache:%s, data:%p extend expire time: %" PRId64, pCacheObj->name, pNode->data, pNode->expireTime);
488
  }
H
Haojun Liao 已提交
489

H
Haojun Liao 已提交
490 491
  if (_remove) {
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
S
cache  
Shengliang Guan 已提交
492 493
    char *key = pNode->key;
    char *d = pNode->data;
H
Haojun Liao 已提交
494

495
    int32_t ref = T_REF_VAL_GET(pNode);
H
Haojun Liao 已提交
496
    uDebug("cache:%s, key:%p, %p is released, refcnt:%d, in trashcan:%d", pCacheObj->name, key, d, ref - 1, inTrashcan);
H
Haojun Liao 已提交
497 498

    /*
S
cache  
Shengliang Guan 已提交
499 500
     * If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all
     * users releasing this resources.
H
Haojun Liao 已提交
501 502 503 504
     *
     * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
     * that tries to do the same thing.
     */
H
Haojun Liao 已提交
505
    if (inTrashcan) {
H
Haojun Liao 已提交
506
      ref = T_REF_VAL_GET(pNode);
507

H
Haojun Liao 已提交
508 509 510
      if (ref == 1) {
        // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may be
        // destroyed by refresh worker if decrease ref count before removing it from linked-list.
H
Haojun Liao 已提交
511
        assert(pNode->pTNodeHeader->pData == pNode);
H
Haojun Liao 已提交
512

513
        __trashcan_wr_lock(pCacheObj);
H
Haojun Liao 已提交
514
        doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
515
        __trashcan_unlock(pCacheObj);
H
Haojun Liao 已提交
516

H
Haojun Liao 已提交
517 518 519
        ref = T_REF_DEC(pNode);
        assert(ref == 0);

H
Haojun Liao 已提交
520
        doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
H
Haojun Liao 已提交
521 522
      } else {
        ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
523
        assert(ref >= 0);
H
Haojun Liao 已提交
524 525
      }
    } else {
526 527
      // NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
      // when reaches here.
528 529 530 531
      SCacheNode * prev = NULL;
      SCacheEntry *pe = doFindEntry(pCacheObj, pNode->key, pNode->keyLen);

      taosWLockLatch(&pe->latch);
532
      ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
533

534 535 536 537 538
      SCacheNode *p = doSearchInEntryList(pe, pNode->key, pNode->keyLen, &prev);

      if (p != NULL) {
        // successfully remove from hash table, if failed, this node must have been move to trash already, do nothing.
        // note that the remove operation can be executed only once.
H
Haojun Liao 已提交
539
        if (p != pNode) {
S
cache  
Shengliang Guan 已提交
540
          uDebug(
541 542 543
              "cache:%s, key:%p, a new entry:%p found, refcnt:%d, prev entry:%p, refcnt:%d has been removed by "
              "others already, prev must in trashcan",
              pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data, T_REF_VAL_GET(pNode));
H
Haojun Liao 已提交
544

545
          assert(p->pTNodeHeader == NULL && pNode->pTNodeHeader != NULL);
H
Haojun Liao 已提交
546
        } else {
547
          removeNodeInEntryList(pe, prev, p);
H
Haojun Liao 已提交
548
          uDebug("cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d", pCacheObj->name, pNode->key,
H
Haojun Liao 已提交
549 550 551
                 pNode->data, ref);
          if (ref > 0) {
            assert(pNode->pTNodeHeader == NULL);
H
Haojun Liao 已提交
552
            taosAddToTrashcan(pCacheObj, pNode);
H
Haojun Liao 已提交
553
          } else {  // ref == 0
554
            atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
H
Haojun Liao 已提交
555

556
            int32_t size = (int32_t)pCacheObj->numOfElems;
H
Haojun Liao 已提交
557
            uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes",
558
                   pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->sizeInBytes);
H
Haojun Liao 已提交
559

H
Haojun Liao 已提交
560 561 562
            if (pCacheObj->freeFp) {
              pCacheObj->freeFp(pNode->data);
            }
H
Haojun Liao 已提交
563

H
Haojun Liao 已提交
564
            free(pNode);
H
Haojun Liao 已提交
565
          }
H
Haojun Liao 已提交
566
        }
567 568

        taosWUnLockLatch(&pe->latch);
569
      } else {
S
cache  
Shengliang Guan 已提交
570 571
        uDebug("cache:%s, key:%p, %p has been removed from hash table by others already, refcnt:%d", pCacheObj->name,
               pNode->key, pNode->data, ref);
H
Haojun Liao 已提交
572
      }
H
Haojun Liao 已提交
573 574
    }

575
  } else {
H
Haojun Liao 已提交
576
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
S
cache  
Shengliang Guan 已提交
577 578 579
    char *key = pNode->key;
    char *p = pNode->data;

H
Haojun Liao 已提交
580
    int32_t ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
581
    uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trashcan:%d", pCacheObj->name, key, p, ref, inTrashcan);
H
Haojun Liao 已提交
582 583
  }
}
H
Haojun Liao 已提交
584

585 586
static bool doRemoveNodeFn(void *param, SCacheNode *pNode) {
  SCacheObjTravSup *ps = (SCacheObjTravSup *)param;
S
cache  
Shengliang Guan 已提交
587
  SCacheObj     *pCacheObj = ps->pCacheObj;
H
Haojun Liao 已提交
588 589 590

  if (T_REF_VAL_GET(pNode) == 0) {
    taosCacheReleaseNode(pCacheObj, pNode);
S
cache  
Shengliang Guan 已提交
591
  } else {  // do add to trashcan
H
Haojun Liao 已提交
592
    taosAddToTrashcan(pCacheObj, pNode);
593
  }
H
Haojun Liao 已提交
594 595 596

  // this node should be remove from hash table
  return false;
597 598
}

599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626
void doTraverseElems(SCacheObj* pCacheObj, bool (*fp)(void *param, SCacheNode* pNode), SCacheObjTravSup* pSup) {
  int32_t numOfEntries = (int32_t)pCacheObj->capacity;
  for (int32_t i = 0; i < numOfEntries; ++i) {
    SCacheEntry *pEntry = &pCacheObj->pEntryList[i];
    if (pEntry->num == 0) {
      continue;
    }

    taosWLockLatch(&pEntry->latch);

    SCacheNode *pNode = pEntry->next;
    while (pNode != NULL) {
      SCacheNode *next = pNode->pNext;

      if (fp(pSup, pNode)) {
        pNode = pNode->pNext;
      } else {
        pEntry->next = next;
        pEntry->num -= 1;

        atomic_sub_fetch_64(&pCacheObj->numOfElems, 1);
        pNode = next;
      }
    }

    taosWUnLockLatch(&pEntry->latch);
  }
}
H
Haojun Liao 已提交
627

628 629 630
void taosCacheEmpty(SCacheObj* pCacheObj) {
  SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
  doTraverseElems(pCacheObj, doRemoveNodeFn, &sup);
H
Haojun Liao 已提交
631
  taosTrashcanEmpty(pCacheObj, false);
632 633 634 635 636 637
}

void taosCacheCleanup(SCacheObj *pCacheObj) {
  if (pCacheObj == NULL) {
    return;
  }
638 639

  pCacheObj->deleting = 1;
640 641

  // wait for the refresh thread quit before destroying the cache object.
642
  // But in the dll, the child thread will be killed before atexit takes effect.
S
cache  
Shengliang Guan 已提交
643 644 645
  while (atomic_load_8(&pCacheObj->deleting) != 0) {
    if (refreshWorkerNormalStopped) break;
    if (refreshWorkerUnexpectedStopped) return;
646
    taosMsleep(50);
S
TD-2805  
Shengliang Guan 已提交
647
  }
648

H
Haojun Liao 已提交
649
  uInfo("cache:%s will be cleaned up", pCacheObj->name);
650 651 652
  doCleanupDataCache(pCacheObj);
}

653 654
SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration) {
  size_t sizeInBytes = size + sizeof(SCacheNode) + keyLen;
655

656
  SCacheNode *pNewNode = calloc(1, sizeInBytes);
657
  if (pNewNode == NULL) {
658
    terrno = TSDB_CODE_OUT_OF_MEMORY;
659 660 661 662
    uError("failed to allocate memory, reason:%s", strerror(errno));
    return NULL;
  }

663 664
  pNewNode->data    = (char*)pNewNode + sizeof(SCacheNode);
  pNewNode->dataLen = size;
665 666
  memcpy(pNewNode->data, pData, size);

667 668
  pNewNode->key    = (char *)pNewNode + sizeof(SCacheNode) + size;
  pNewNode->keyLen = (uint16_t)keyLen;
669 670 671

  memcpy(pNewNode->key, key, keyLen);

672 673
  pNewNode->addedTime  = (uint64_t)taosGetTimestampMs();
  pNewNode->lifespan   = duration;
S
cache  
Shengliang Guan 已提交
674
  pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan;
675 676
  pNewNode->signature  = (uint64_t)pNewNode;
  pNewNode->size       = (uint32_t)sizeInBytes;
677 678 679 680

  return pNewNode;
}

681
void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheNode *pNode) {
H
Haojun Liao 已提交
682
  if (pNode->inTrashcan) { /* node is already in trash */
683
    assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
684 685 686
    return;
  }

687
  __trashcan_wr_lock(pCacheObj);
688 689
  STrashElem *pElem = calloc(1, sizeof(STrashElem));
  pElem->pData = pNode;
690 691
  pElem->prev = NULL;
  pElem->next = NULL;
H
Haojun Liao 已提交
692
  pNode->inTrashcan = true;
693
  pNode->pTNodeHeader = pElem;
H
Haojun Liao 已提交
694

695 696 697 698 699 700 701
  pElem->next = pCacheObj->pTrash;
  if (pCacheObj->pTrash) {
    pCacheObj->pTrash->prev = pElem;
  }

  pCacheObj->pTrash = pElem;
  pCacheObj->numOfElemsInTrash++;
702
  __trashcan_unlock(pCacheObj);
703

704 705
  uDebug("cache:%s key:%p, %p move to trashcan, pTrashElem:%p, numOfElem in trashcan:%d", pCacheObj->name, pNode->key,
         pNode->data, pElem, pCacheObj->numOfElemsInTrash);
706 707
}

H
Haojun Liao 已提交
708
void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
709
  __trashcan_wr_lock(pCacheObj);
710 711 712

  if (pCacheObj->numOfElemsInTrash == 0) {
    if (pCacheObj->pTrash != NULL) {
H
Haojun Liao 已提交
713
      pCacheObj->pTrash = NULL;
S
cache  
Shengliang Guan 已提交
714 715
      uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name,
             pCacheObj->numOfElemsInTrash);
716 717
    }

718
    __trashcan_unlock(pCacheObj);
719
    return;
H
hjxilinx 已提交
720
  }
721

S
cache  
Shengliang Guan 已提交
722
  const char *stat[] = {"false", "true"};
H
Haojun Liao 已提交
723
  uDebug("cache:%s start to cleanup trashcan, numOfElem in trashcan:%d, free:%s", pCacheObj->name,
S
cache  
Shengliang Guan 已提交
724
         pCacheObj->numOfElemsInTrash, (force ? stat[1] : stat[0]));
725

H
Haojun Liao 已提交
726
  STrashElem *pElem = pCacheObj->pTrash;
727 728
  while (pElem) {
    T_REF_VAL_CHECK(pElem->pData);
H
Haojun Liao 已提交
729
    assert(pElem->next != pElem && pElem->prev != pElem);
730 731

    if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
S
cache  
Shengliang Guan 已提交
732 733
      uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key,
             pElem->pData->data, pCacheObj->numOfElemsInTrash - 1);
734

H
Haojun Liao 已提交
735 736 737
      doRemoveElemInTrashcan(pCacheObj, pElem);
      doDestroyTrashcanElem(pCacheObj, pElem);
      pElem = pCacheObj->pTrash;
738 739 740 741 742
    } else {
      pElem = pElem->next;
    }
  }

743
  __trashcan_unlock(pCacheObj);
744 745 746
}

void doCleanupDataCache(SCacheObj *pCacheObj) {
747 748
  SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
  doTraverseElems(pCacheObj, doRemoveNodeFn, &sup);
749

H
Haojun Liao 已提交
750
  // todo memory leak if there are object with refcount greater than 0 in hash table?
751
  taosTrashcanEmpty(pCacheObj, true);
H
Haojun Liao 已提交
752

753
  __trashcan_lock_destroy(pCacheObj);
S
cache  
Shengliang Guan 已提交
754

755
  tfree(pCacheObj->pEntryList);
S
TD-1848  
Shengliang Guan 已提交
756
  tfree(pCacheObj->name);
757
  free(pCacheObj);
758
}
759

760 761
bool doRemoveExpiredFn(void *param, SCacheNode* pNode) {
  SCacheObjTravSup *ps = (SCacheObjTravSup *)param;
S
cache  
Shengliang Guan 已提交
762
  SCacheObj     *pCacheObj = ps->pCacheObj;
763

S
Shengliang Guan 已提交
764
  if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) {
H
Haojun Liao 已提交
765
    taosCacheReleaseNode(pCacheObj, pNode);
H
Haojun Liao 已提交
766

H
Haojun Liao 已提交
767 768 769
    // this node should be remove from hash table
    return false;
  }
770

H
Haojun Liao 已提交
771
  if (ps->fp) {
A
AlexDuan 已提交
772
    (ps->fp)(pNode->data, ps->param1);
773 774
  }

H
Haojun Liao 已提交
775 776 777 778
  // do not remove element in hash table
  return true;
}

S
cache  
Shengliang Guan 已提交
779
static void doCacheRefresh(SCacheObj *pCacheObj, int64_t time, __cache_trav_fn_t fp, void *param1) {
H
Haojun Liao 已提交
780
  assert(pCacheObj != NULL);
781

782 783
  SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1};
  doTraverseElems(pCacheObj, doRemoveExpiredFn, &sup);
784 785
}

786
void taosCacheRefreshWorkerUnexpectedStopped(void) {
S
cache  
Shengliang Guan 已提交
787 788
  if (!refreshWorkerNormalStopped) {
    refreshWorkerUnexpectedStopped = true;
789 790 791
  }
}

S
cache  
Shengliang Guan 已提交
792
void *taosCacheTimedRefresh(void *handle) {
793 794
  assert(pCacheArrayList != NULL);
  uDebug("cache refresh thread starts");
795

H
Haojun Liao 已提交
796
  setThreadName("cacheRefresh");
797

S
cache  
Shengliang Guan 已提交
798 799
  const int32_t SLEEP_DURATION = 500;  // 500 ms
  int64_t       count = 0;
800
  atexit(taosCacheRefreshWorkerUnexpectedStopped);
801

S
cache  
Shengliang Guan 已提交
802
  while (1) {
803
    taosMsleep(SLEEP_DURATION);
804 805 806
    if (stopRefreshWorker) {
      goto _end;
    }
H
Haojun Liao 已提交
807

808 809 810
    pthread_mutex_lock(&guard);
    size_t size = taosArrayGetSize(pCacheArrayList);
    pthread_mutex_unlock(&guard);
811

812
    count += 1;
813

S
cache  
Shengliang Guan 已提交
814
    for (int32_t i = 0; i < size; ++i) {
815
      pthread_mutex_lock(&guard);
S
cache  
Shengliang Guan 已提交
816
      SCacheObj *pCacheObj = taosArrayGetP(pCacheArrayList, i);
817

818
      if (pCacheObj == NULL) {
819 820 821
        uError("object is destroyed. ignore and try next");
        pthread_mutex_unlock(&guard);
        continue;
822
      }
823

824 825
      // check if current cache object will be deleted every 500ms.
      if (pCacheObj->deleting) {
826 827 828
        taosArrayRemove(pCacheArrayList, i);
        size = taosArrayGetSize(pCacheArrayList);

S
cache  
Shengliang Guan 已提交
829 830
        uDebug("%s is destroying, remove it from refresh list, remain cache obj:%" PRIzu, pCacheObj->name, size);
        pCacheObj->deleting = 0;  // reset the deleting flag to enable pCacheObj to continue releasing resources.
831

832 833
        pthread_mutex_unlock(&guard);
        continue;
834
      }
835

836 837
      pthread_mutex_unlock(&guard);

838 839 840 841
      if ((count % pCacheObj->checkTick) != 0) {
        continue;
      }

842
      size_t elemInHash = pCacheObj->numOfElems;
843 844 845 846 847 848 849 850 851 852
      if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
        continue;
      }

      uDebug("%s refresh thread scan", pCacheObj->name);
      pCacheObj->statistics.refreshCount++;

      // refresh data in hash table
      if (elemInHash > 0) {
        int64_t now = taosGetTimestampMs();
A
AlexDuan 已提交
853
        doCacheRefresh(pCacheObj, now, NULL, NULL);
854 855 856 857
      }

      taosTrashcanEmpty(pCacheObj, false);
    }
858 859
  }

S
cache  
Shengliang Guan 已提交
860
_end:
861
  taosArrayDestroy(pCacheArrayList);
H
Haojun Liao 已提交
862 863 864

  pCacheArrayList = NULL;
  pthread_mutex_destroy(&guard);
S
cache  
Shengliang Guan 已提交
865
  refreshWorkerNormalStopped = true;
H
Haojun Liao 已提交
866

867
  uDebug("cache refresh thread quits");
868
  return NULL;
dengyihao's avatar
dengyihao 已提交
869
}
870

S
cache  
Shengliang Guan 已提交
871
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1) {
872 873 874 875 876
  if (pCacheObj == NULL) {
    return;
  }

  int64_t now = taosGetTimestampMs();
A
AlexDuan 已提交
877
  doCacheRefresh(pCacheObj, now, fp, param1);
878
}
879

S
cache  
Shengliang Guan 已提交
880
void taosStopCacheRefreshWorker(void) { stopRefreshWorker = true; }