tcache.c 20.6 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16 17 18
#define _DEFAULT_SOURCE
#include "os.h"
#include "tulog.h"
H
hzcheng 已提交
19 20 21
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
S
slguan 已提交
22 23 24
#include "tcache.h"
#include "hash.h"
#include "hashfunc.h"
H
hzcheng 已提交
25

26
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) {
27
#if defined(LINUX)
28
  pthread_rwlock_wrlock(&pCacheObj->lock);
29
#else
30
  pthread_mutex_lock(&pCacheObj->lock);
31 32 33
#endif
}

34
static FORCE_INLINE void __cache_rd_lock(SCacheObj *pCacheObj) {
35
#if defined(LINUX)
36
  pthread_rwlock_rdlock(&pCacheObj->lock);
37
#else
38
  pthread_mutex_lock(&pCacheObj->lock);
39 40 41
#endif
}

42
static FORCE_INLINE void __cache_unlock(SCacheObj *pCacheObj) {
43
#if defined(LINUX)
44
  pthread_rwlock_unlock(&pCacheObj->lock);
45
#else
46
  pthread_mutex_unlock(&pCacheObj->lock);
47 48 49
#endif
}

50
static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pCacheObj) {
51
#if defined(LINUX)
52
  return pthread_rwlock_init(&pCacheObj->lock, NULL);
53
#else
54
  return pthread_mutex_init(&pCacheObj->lock, NULL);
55 56 57
#endif
}

58
static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
59
#if defined(LINUX)
60
  pthread_rwlock_destroy(&pCacheObj->lock);
61
#else
62
  pthread_mutex_destroy(&pCacheObj->lock);
63 64 65
#endif
}

66
#if 0
67 68 69
static FORCE_INLINE void taosFreeNode(void *data) {
  SCacheDataNode *pNode = *(SCacheDataNode **)data;
  free(pNode);
H
hzcheng 已提交
70
}
71
#endif
H
hzcheng 已提交
72 73 74 75 76 77 78

/**
 * @param key      key of object for hash, usually a null-terminated string
 * @param keyLen   length of key
 * @param pData    actually data. required a consecutive memory block, no pointer is allowed
 *                 in pData. Pointer copy causes memory access error.
 * @param size     size of block
79 80
 * @param lifespan total survial expiredTime from now
 * @return         SCacheDataNode
H
hzcheng 已提交
81
 */
82
static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration);
H
hzcheng 已提交
83 84

/**
85
 * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
H
hzcheng 已提交
86
 * It will be removed until the pNode->refCount == 0
87
 * @param pCacheObj    Cache object
H
hzcheng 已提交
88 89
 * @param pNode   Cache slot object
 */
90 91 92 93 94 95 96 97
static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode);

/**
 * remove node in trash can
 * @param pCacheObj 
 * @param pElem 
 */
static void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem);
H
hzcheng 已提交
98 99 100 101

/**
 * remove nodes in trash with refCount == 0 in cache
 * @param pNode
102
 * @param pCacheObj
H
hzcheng 已提交
103 104 105
 * @param force   force model, if true, remove data in trash without check refcount.
 *                may cause corruption. So, forece model only applys before cache is closed
 */
106
static void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force);
H
hzcheng 已提交
107 108 109

/**
 * release node
110
 * @param pCacheObj      cache object
H
hzcheng 已提交
111 112
 * @param pNode     data node
 */
113
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
H
hzcheng 已提交
114
  if (pNode->signature != (uint64_t)pNode) {
S
slguan 已提交
115
    uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
H
hzcheng 已提交
116 117
    return;
  }
118
  
H
hjxilinx 已提交
119
  int32_t size = pNode->size;
120
  taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
121 122 123 124

  uDebug("key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes, cacheName:%s",
         pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, size,
         pCacheObj->cacheName);
S
Shengliang Guan 已提交
125
  if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
H
hzcheng 已提交
126 127 128 129 130
  free(pNode);
}

/**
 * move the old node into trash
131
 * @param pCacheObj
H
hzcheng 已提交
132 133
 * @param pNode
 */
134 135 136
static FORCE_INLINE void taosCacheMoveToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
  taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
  taosAddToTrash(pCacheObj, pNode);
H
hzcheng 已提交
137 138 139 140
}

/**
 * update data in cache
141
 * @param pCacheObj
H
hzcheng 已提交
142 143 144 145 146 147 148
 * @param pNode
 * @param key
 * @param keyLen
 * @param pData
 * @param dataSize
 * @return
 */
H
hjxilinx 已提交
149 150
static SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pCacheObj, SCacheDataNode *pNode, const char *key, int32_t keyLen,
                                           const void *pData, uint32_t dataSize, uint64_t duration) {
151 152
  SCacheDataNode *pNewNode = NULL;
  
H
hjxilinx 已提交
153
  // only a node is not referenced by any other object, in-place update it
154
  if (T_REF_VAL_GET(pNode) == 0) {
155
    size_t newSize = sizeof(SCacheDataNode) + dataSize + keyLen + 1;
156 157
    
    pNewNode = (SCacheDataNode *)realloc(pNode, newSize);
H
hzcheng 已提交
158 159 160
    if (pNewNode == NULL) {
      return NULL;
    }
161
    
162
    memset(pNewNode, 0, newSize);
H
hzcheng 已提交
163 164
    pNewNode->signature = (uint64_t)pNewNode;
    memcpy(pNewNode->data, pData, dataSize);
165 166 167 168 169
    
    pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + dataSize;
    pNewNode->keySize = keyLen;
    memcpy(pNewNode->key, key, keyLen);
    
H
hjxilinx 已提交
170
    // update the timestamp information for updated key/value
171
    pNewNode->addedTime = taosGetTimestampMs();
H
Haojun Liao 已提交
172
    pNewNode->lifespan = duration;
173 174 175
    
    T_REF_INC(pNewNode);
    
H
hjxilinx 已提交
176
    // the address of this node may be changed, so the prev and next element should update the corresponding pointer
177
    taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
H
hzcheng 已提交
178
  } else {
179
    taosCacheMoveToTrash(pCacheObj, pNode);
180
    
181
    pNewNode = taosCreateCacheNode(key, keyLen, pData, dataSize, duration);
H
hzcheng 已提交
182 183 184
    if (pNewNode == NULL) {
      return NULL;
    }
185 186 187 188
    
    T_REF_INC(pNewNode);
    
    // addedTime new element to hashtable
189
    taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
H
hzcheng 已提交
190
  }
191
  
H
hzcheng 已提交
192 193 194 195
  return pNewNode;
}

/**
196
 * addedTime data into hash table
H
hzcheng 已提交
197 198 199
 * @param key
 * @param pData
 * @param size
200
 * @param pCacheObj
H
hzcheng 已提交
201 202 203 204
 * @param keyLen
 * @param pNode
 * @return
 */
H
hjxilinx 已提交
205
static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pCacheObj, const char *key, size_t keyLen, const void *pData,
206
                                                       size_t dataSize, uint64_t duration) {
207
  SCacheDataNode *pNode = taosCreateCacheNode(key, keyLen, pData, dataSize, duration);
H
hzcheng 已提交
208 209 210
  if (pNode == NULL) {
    return NULL;
  }
211 212
  
  T_REF_INC(pNode);
213
  taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode, sizeof(void *));
H
hzcheng 已提交
214 215 216
  return pNode;
}

217 218 219 220 221
/**
 * do cleanup the taos cache
 * @param pCacheObj
 */
static void doCleanupDataCache(SCacheObj *pCacheObj);
222

H
hzcheng 已提交
223
/**
224
 * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
H
hzcheng 已提交
225 226
 * @param handle   Cache object handle
 */
227
static void* taosCacheRefresh(void *handle);
H
hjxilinx 已提交
228

229
SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char* cacheName) {
H
Haojun Liao 已提交
230
  if (refreshTimeInSeconds <= 0) {
H
hzcheng 已提交
231 232
    return NULL;
  }
233
  
234 235
  SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
  if (pCacheObj == NULL) {
S
slguan 已提交
236
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
237 238
    return NULL;
  }
239
  
H
Haojun Liao 已提交
240
  pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false);
241
  pCacheObj->cacheName = cacheName;
242 243
  if (pCacheObj->pHashTable == NULL) {
    free(pCacheObj);
S
slguan 已提交
244
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
245 246
    return NULL;
  }
247 248
  
  // set free cache node callback function for hash table
H
Haojun Liao 已提交
249 250 251
  pCacheObj->freeFp = fn;
  pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
  pCacheObj->extendLifespan = extendLifespan;
252

253 254 255
  if (__cache_lock_init(pCacheObj) != 0) {
    taosHashCleanup(pCacheObj->pHashTable);
    free(pCacheObj);
256
    
S
slguan 已提交
257
    uError("failed to init lock, reason:%s", strerror(errno));
H
hzcheng 已提交
258 259
    return NULL;
  }
260

H
Haojun Liao 已提交
261
  pthread_attr_t thattr = {{0}};
262 263 264 265 266 267
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

  pthread_create(&pCacheObj->refreshWorker, &thattr, taosCacheRefresh, pCacheObj);

  pthread_attr_destroy(&thattr);
268
  return pCacheObj;
269
}
H
hzcheng 已提交
270

271 272
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char* cacheName) {
  return taosCacheInitWithCb(keyType, refreshTimeInSeconds, extendLifespan, fn, cacheName);
S
Shengliang Guan 已提交
273 274
}

H
Haojun Liao 已提交
275
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) {
276 277
  SCacheDataNode *pNode;
  
278
  if (pCacheObj == NULL || pCacheObj->pHashTable == NULL) {
279 280
    return NULL;
  }
H
Haojun Liao 已提交
281

282 283
  __cache_wr_lock(pCacheObj);
  SCacheDataNode **pt = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
284 285 286
  SCacheDataNode * pOld = (pt != NULL) ? (*pt) : NULL;
  
  if (pOld == NULL) {  // do addedTime to cache
287
    pNode = taosAddToCacheImpl(pCacheObj, key, keyLen, pData, dataSize, duration * 1000L);
288
    if (NULL != pNode) {
289
      pCacheObj->totalSize += pNode->size;
290 291 292 293 294

      uDebug("key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64
             "bytes size:%" PRId64 "bytes, cacheName:%s",
             key, pNode->data, pNode->addedTime, (pNode->lifespan * pNode->extendFactor + pNode->addedTime),
             (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, dataSize, pCacheObj->cacheName);
L
lihui 已提交
295
    } else {
296
      uError("key:%p, failed to added into cache, out of memory, cacheName:%s", key, pCacheObj->cacheName);
297 298
    }
  } else {  // old data exists, update the node
299
    pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L);
300
    uDebug("key:%p, %p exist in cache, updated, cacheName:%s", key, pNode->data, pCacheObj->cacheName);
301
  }
302

303
  __cache_unlock(pCacheObj);
304

305
  return (pNode != NULL) ? pNode->data : NULL;
H
hzcheng 已提交
306 307
}

H
Haojun Liao 已提交
308
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
309
  if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
310
    return NULL;
311
  }
H
Haojun Liao 已提交
312

313
  __cache_rd_lock(pCacheObj);
314
  
315
  SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
H
Haojun Liao 已提交
316 317

  int32_t ref = 0;
318
  if (ptNode != NULL) {
H
Haojun Liao 已提交
319 320 321 322 323 324 325 326 327 328 329
    ref = T_REF_INC(*ptNode);

    // if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan
    if (pCacheObj->extendLifespan) {
      int64_t now = taosGetTimestampMs();

      if ((now - (*ptNode)->addedTime) < (*ptNode)->lifespan * (*ptNode)->extendFactor) {
        (*ptNode)->extendFactor += 1;
        uDebug("key:%p extend life time to %"PRId64, key, (*ptNode)->lifespan * (*ptNode)->extendFactor + (*ptNode)->addedTime);
      }
    }
330
  }
331
  __cache_unlock(pCacheObj);
332 333
  
  if (ptNode != NULL) {
334
    atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
335
    uDebug("key:%p, %p is retrieved from cache, refcnt:%d, cacheName:%s", key, (*ptNode)->data, ref, pCacheObj->cacheName);
336
  } else {
337
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
338
    uDebug("key:%p, not in cache, retrieved failed, cacheName:%s", key, pCacheObj->cacheName);
339 340
  }
  
341
  atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
342
  return (ptNode != NULL) ? (*ptNode)->data : NULL;
343 344
}

345
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t keyLen, uint64_t expireTime) {
S
Shengliang Guan 已提交
346 347 348
  if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
    return NULL;
  }
H
Haojun Liao 已提交
349

S
Shengliang Guan 已提交
350 351 352 353 354
  __cache_rd_lock(pCacheObj);
  
  SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
  if (ptNode != NULL) {
     T_REF_INC(*ptNode);
H
Haojun Liao 已提交
355 356
    (*ptNode)->extendFactor += 1;
//    (*ptNode)->lifespan = expireTime;
S
Shengliang Guan 已提交
357
  }
358

S
Shengliang Guan 已提交
359
  __cache_unlock(pCacheObj);
360

S
Shengliang Guan 已提交
361 362
  if (ptNode != NULL) {
    atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
363 364
    uDebug("key:%p, %p expireTime is updated in cache, refcnt:%d, cacheName:%s", key, (*ptNode)->data,
           T_REF_VAL_GET(*ptNode), pCacheObj->cacheName);
S
Shengliang Guan 已提交
365 366
  } else {
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
367
    uDebug("key:%p, not in cache, retrieved failed, cacheName:%s", key, pCacheObj->cacheName);
S
Shengliang Guan 已提交
368
  }
369

S
Shengliang Guan 已提交
370 371 372 373
  atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
  return (ptNode != NULL) ? (*ptNode)->data : NULL;
}

374 375
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
  if (pCacheObj == NULL || data == NULL) return NULL;
376
  
377 378
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
379
  
380
  if (ptNode->signature != (uint64_t)ptNode) {
S
slguan 已提交
381
    uError("key: %p the data from cache is invalid", ptNode);
382 383
    return NULL;
  }
384

385
  int32_t ref = T_REF_INC(ptNode);
386
  uDebug("%p acquired by data in cache, refcnt:%d, cacheName:%s", ptNode->data, ref, pCacheObj->cacheName);
H
Haojun Liao 已提交
387 388 389 390 391 392 393

  // if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan
  if (pCacheObj->extendLifespan) {
    int64_t now = taosGetTimestampMs();

    if ((now - ptNode->addedTime) < ptNode->lifespan * ptNode->extendFactor) {
      ptNode->extendFactor += 1;
394 395
      uDebug("%p extend life time to %" PRId64, ptNode->data,
             ptNode->lifespan * ptNode->extendFactor + ptNode->addedTime);
H
Haojun Liao 已提交
396 397 398
    }
  }

399 400 401 402 403
  // the data if referenced by at least one object, so the reference count must be greater than the value of 2.
  assert(ref >= 2);
  return data;
}

404 405
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
  if (pCacheObj == NULL || data == NULL) return NULL;
406
  
407 408
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
409
  
410
  if (ptNode->signature != (uint64_t)ptNode) {
S
slguan 已提交
411
    uError("key: %p the data from cache is invalid", ptNode);
412 413 414
    return NULL;
  }
  
415
  assert(T_REF_VAL_GET(ptNode) >= 1);
416
  
417
  char *d = *data;
418 419 420 421 422
  
  // clear its reference to old area
  *data = NULL;
  
  return d;
H
hzcheng 已提交
423
}
424

425 426
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
  if (pCacheObj == NULL || (*data) == NULL || (taosHashGetSize(pCacheObj->pHashTable) + pCacheObj->numOfElemsInTrash == 0)) {
427 428 429 430 431 432 433
    return;
  }
  
  size_t offset = offsetof(SCacheDataNode, data);
  
  SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
  if (pNode->signature != (uint64_t)pNode) {
434
    uError("%p, release invalid cache data", pNode);
435 436
    return;
  }
437

438
  *data = NULL;
439
  int16_t ref = T_REF_DEC(pNode);
440 441
  uDebug("key:%p, %p is released, refcnt:%d, cacheName:%s", pNode->key, pNode->data, ref, pCacheObj->cacheName);

H
Hongze Cheng 已提交
442
  if (_remove && (!pNode->inTrashCan)) {
443
    __cache_wr_lock(pCacheObj);
H
Haojun Liao 已提交
444 445 446 447 448 449 450 451 452 453

    if (T_REF_VAL_GET(pNode) == 0) {
      // remove directly, if not referenced by other users
      taosCacheReleaseNode(pCacheObj, pNode);
    } else {
      // pNode may be released immediately by other thread after the reference count of pNode is set to 0,
      // So we need to lock it in the first place.
      taosCacheMoveToTrash(pCacheObj, pNode);
    }

454
    __cache_unlock(pCacheObj);
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
  }
}

void taosCacheEmpty(SCacheObj *pCacheObj) {
  SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
  
  __cache_wr_lock(pCacheObj);
  while (taosHashIterNext(pIter)) {
    if (pCacheObj->deleting == 1) {
      break;
    }
    
    SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
    taosCacheMoveToTrash(pCacheObj, pNode);
  }
  __cache_unlock(pCacheObj);
  
  taosHashDestroyIter(pIter);
473
  taosTrashCanEmpty(pCacheObj, false);
474 475 476 477 478 479
}

void taosCacheCleanup(SCacheObj *pCacheObj) {
  if (pCacheObj == NULL) {
    return;
  }
480 481 482 483

  pCacheObj->deleting = 1;
  pthread_join(pCacheObj->refreshWorker, NULL);

484
  uInfo("cacheName:%p, will be cleanuped", pCacheObj->cacheName);
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504
  doCleanupDataCache(pCacheObj);
}

SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
                                           uint64_t duration) {
  size_t totalSize = size + sizeof(SCacheDataNode) + keyLen + 1;

  SCacheDataNode *pNewNode = calloc(1, totalSize);
  if (pNewNode == NULL) {
    uError("failed to allocate memory, reason:%s", strerror(errno));
    return NULL;
  }

  memcpy(pNewNode->data, pData, size);

  pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size;
  pNewNode->keySize = keyLen;

  memcpy(pNewNode->key, key, keyLen);

H
Haojun Liao 已提交
505 506 507 508 509
  pNewNode->addedTime    = (uint64_t)taosGetTimestampMs();
  pNewNode->lifespan     = duration;
  pNewNode->extendFactor = 1;
  pNewNode->signature    = (uint64_t)pNewNode;
  pNewNode->size         = (uint32_t)totalSize;
510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532

  return pNewNode;
}

void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
  if (pNode->inTrashCan) { /* node is already in trash */
    return;
  }

  STrashElem *pElem = calloc(1, sizeof(STrashElem));
  pElem->pData = pNode;

  pElem->next = pCacheObj->pTrash;
  if (pCacheObj->pTrash) {
    pCacheObj->pTrash->prev = pElem;
  }

  pElem->prev = NULL;
  pCacheObj->pTrash = pElem;

  pNode->inTrashCan = true;
  pCacheObj->numOfElemsInTrash++;

533
  uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode->data, pCacheObj->numOfElemsInTrash);
534 535 536 537
}

void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) {
  if (pElem->pData->signature != (uint64_t)pElem->pData) {
H
Hui Li 已提交
538
    uError("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
539 540 541 542 543 544 545 546 547 548 549 550 551 552 553
    return;
  }

  pCacheObj->numOfElemsInTrash--;
  if (pElem->prev) {
    pElem->prev->next = pElem->next;
  } else { /* pnode is the header, update header */
    pCacheObj->pTrash = pElem->next;
  }

  if (pElem->next) {
    pElem->next->prev = pElem->prev;
  }

  pElem->pData->signature = 0;
H
Haojun Liao 已提交
554 555 556
  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pElem->pData->data);
  }
H
Haojun Liao 已提交
557

558 559 560 561 562 563 564 565 566 567 568 569 570 571 572
  free(pElem->pData);
  free(pElem);
}

void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
  __cache_wr_lock(pCacheObj);

  if (pCacheObj->numOfElemsInTrash == 0) {
    if (pCacheObj->pTrash != NULL) {
      uError("key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->numOfElemsInTrash);
    }
    pCacheObj->pTrash = NULL;

    __cache_unlock(pCacheObj);
    return;
H
hjxilinx 已提交
573
  }
574 575 576 577 578 579 580 581 582 583

  STrashElem *pElem = pCacheObj->pTrash;

  while (pElem) {
    T_REF_VAL_CHECK(pElem->pData);
    if (pElem->next == pElem) {
      pElem->next = NULL;
    }

    if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
584
      uDebug("key:%p, %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData->data,
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599
             pCacheObj->numOfElemsInTrash - 1);
      STrashElem *p = pElem;

      pElem = pElem->next;
      taosRemoveFromTrashCan(pCacheObj, p);
    } else {
      pElem = pElem->next;
    }
  }

  __cache_unlock(pCacheObj);
}

void doCleanupDataCache(SCacheObj *pCacheObj) {
  __cache_wr_lock(pCacheObj);
600 601 602 603 604

  SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
  while (taosHashIterNext(pIter)) {
    SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
    // if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
605 606 607
    if (T_REF_VAL_GET(pNode) <= 0) {
      taosCacheReleaseNode(pCacheObj, pNode);
    } else {
608 609
      uDebug("key:%p, %p will not remove from cache, refcnt:%d, cacheName:%s", pNode->key, pNode->data,
             T_REF_VAL_GET(pNode), pCacheObj->cacheName);
610
    }
611 612 613 614
  }
  taosHashDestroyIter(pIter);

  taosHashCleanup(pCacheObj->pHashTable); 
615 616 617 618 619 620 621
  __cache_unlock(pCacheObj);

  taosTrashCanEmpty(pCacheObj, true);
  __cache_lock_destroy(pCacheObj);

  memset(pCacheObj, 0, sizeof(SCacheObj));
  free(pCacheObj);
622
}
623 624 625 626

void* taosCacheRefresh(void *handle) {
  SCacheObj *pCacheObj = (SCacheObj *)handle;
  if (pCacheObj == NULL) {
627
    uDebug("object is destroyed. no refresh retry");
628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648
    return NULL;
  }

  const int32_t SLEEP_DURATION = 500; //500 ms
  int64_t totalTick = pCacheObj->refreshTime / SLEEP_DURATION;

  int64_t count = 0;
  while(1) {
    taosMsleep(500);

    // check if current cache object will be deleted every 500ms.
    if (pCacheObj->deleting) {
      break;
    }

    if (++count < totalTick) {
      continue;
    }

    // reset the count value
    count = 0;
H
Haojun Liao 已提交
649 650
    size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable);
    if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
651 652 653 654 655
      continue;
    }

    pCacheObj->statistics.refreshCount++;

H
Haojun Liao 已提交
656 657 658
    // refresh data in hash table
    if (elemInHash > 0) {
      int64_t expiredTime = taosGetTimestampMs();
659

H
Haojun Liao 已提交
660 661 662 663 664 665 666 667
      SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);

      __cache_wr_lock(pCacheObj);
      while (taosHashIterNext(pIter)) {
        SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
        if ((pNode->addedTime + pNode->lifespan * pNode->extendFactor) <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
          taosCacheReleaseNode(pCacheObj, pNode);
        }
668 669
      }

H
Haojun Liao 已提交
670 671 672 673
      __cache_unlock(pCacheObj);

      taosHashDestroyIter(pIter);
    }
674 675 676 677 678

    taosTrashCanEmpty(pCacheObj, false);
  }

  return NULL;
dengyihao's avatar
dengyihao 已提交
679
}