tcache.c 21.1 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16 17 18
#define _DEFAULT_SOURCE
#include "os.h"
#include "tulog.h"
H
hzcheng 已提交
19 20 21
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
S
slguan 已提交
22 23 24
#include "tcache.h"
#include "hash.h"
#include "hashfunc.h"
H
hzcheng 已提交
25

26
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) {
27
#if defined(LINUX)
28
  pthread_rwlock_wrlock(&pCacheObj->lock);
29
#else
30
  pthread_mutex_lock(&pCacheObj->lock);
31 32 33
#endif
}

34
static FORCE_INLINE void __cache_rd_lock(SCacheObj *pCacheObj) {
35
#if defined(LINUX)
36
  pthread_rwlock_rdlock(&pCacheObj->lock);
37
#else
38
  pthread_mutex_lock(&pCacheObj->lock);
39 40 41
#endif
}

42
static FORCE_INLINE void __cache_unlock(SCacheObj *pCacheObj) {
43
#if defined(LINUX)
44
  pthread_rwlock_unlock(&pCacheObj->lock);
45
#else
46
  pthread_mutex_unlock(&pCacheObj->lock);
47 48 49
#endif
}

50
static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pCacheObj) {
51
#if defined(LINUX)
52
  return pthread_rwlock_init(&pCacheObj->lock, NULL);
53
#else
54
  return pthread_mutex_init(&pCacheObj->lock, NULL);
55 56 57
#endif
}

58
static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
59
#if defined(LINUX)
60
  pthread_rwlock_destroy(&pCacheObj->lock);
61
#else
62
  pthread_mutex_destroy(&pCacheObj->lock);
63 64 65
#endif
}

66
#if 0
67 68 69
static FORCE_INLINE void taosFreeNode(void *data) {
  SCacheDataNode *pNode = *(SCacheDataNode **)data;
  free(pNode);
H
hzcheng 已提交
70
}
71
#endif
H
hzcheng 已提交
72 73 74 75 76 77 78

/**
 * @param key      key of object for hash, usually a null-terminated string
 * @param keyLen   length of key
 * @param pData    actually data. required a consecutive memory block, no pointer is allowed
 *                 in pData. Pointer copy causes memory access error.
 * @param size     size of block
79 80
 * @param lifespan total survial expiredTime from now
 * @return         SCacheDataNode
H
hzcheng 已提交
81
 */
82
static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration);
H
hzcheng 已提交
83 84

/**
85
 * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
H
hzcheng 已提交
86
 * It will be removed until the pNode->refCount == 0
87
 * @param pCacheObj    Cache object
H
hzcheng 已提交
88 89
 * @param pNode   Cache slot object
 */
90 91 92 93 94 95 96 97
static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode);

/**
 * remove node in trash can
 * @param pCacheObj 
 * @param pElem 
 */
static void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem);
H
hzcheng 已提交
98 99 100 101

/**
 * remove nodes in trash with refCount == 0 in cache
 * @param pNode
102
 * @param pCacheObj
H
hzcheng 已提交
103 104 105
 * @param force   force model, if true, remove data in trash without check refcount.
 *                may cause corruption. So, forece model only applys before cache is closed
 */
106
static void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force);
H
hzcheng 已提交
107 108 109

/**
 * release node
110
 * @param pCacheObj      cache object
H
hzcheng 已提交
111 112
 * @param pNode     data node
 */
113
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
H
hzcheng 已提交
114
  if (pNode->signature != (uint64_t)pNode) {
S
slguan 已提交
115
    uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
H
hzcheng 已提交
116 117
    return;
  }
118
  
119
  taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
120

121
  pCacheObj->totalSize -= pNode->size;
H
Haojun Liao 已提交
122
  uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes",
123 124 125
         pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize,
         pNode->size);

S
Shengliang Guan 已提交
126
  if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
H
hzcheng 已提交
127 128 129 130 131
  free(pNode);
}

/**
 * move the old node into trash
132
 * @param pCacheObj
H
hzcheng 已提交
133 134
 * @param pNode
 */
135 136 137
static FORCE_INLINE void taosCacheMoveToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
  taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
  taosAddToTrash(pCacheObj, pNode);
H
hzcheng 已提交
138 139 140 141
}

/**
 * update data in cache
142
 * @param pCacheObj
H
hzcheng 已提交
143 144 145 146 147 148 149
 * @param pNode
 * @param key
 * @param keyLen
 * @param pData
 * @param dataSize
 * @return
 */
H
Haojun Liao 已提交
150 151 152
static UNUSED_FUNC SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pCacheObj, SCacheDataNode* pNode, SCacheDataNode* pNewNode,
    const char *key, int32_t keyLen) {

H
hjxilinx 已提交
153
  // only a node is not referenced by any other object, in-place update it
H
Haojun Liao 已提交
154
  if (T_REF_VAL_GET(pNode) > 0) {
155
    taosCacheMoveToTrash(pCacheObj, pNode);
H
hzcheng 已提交
156
  }
H
Haojun Liao 已提交
157 158 159 160 161

  T_REF_INC(pNewNode);

  // addedTime new element to hashtable
  taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
H
hzcheng 已提交
162 163 164 165
  return pNewNode;
}

/**
166
 * addedTime data into hash table
H
hzcheng 已提交
167 168 169
 * @param key
 * @param pData
 * @param size
170
 * @param pCacheObj
H
hzcheng 已提交
171 172 173 174
 * @param keyLen
 * @param pNode
 * @return
 */
H
hjxilinx 已提交
175
static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pCacheObj, const char *key, size_t keyLen, const void *pData,
176
                                                       size_t dataSize, uint64_t duration) {
177
  SCacheDataNode *pNode = taosCreateCacheNode(key, keyLen, pData, dataSize, duration);
H
hzcheng 已提交
178 179 180
  if (pNode == NULL) {
    return NULL;
  }
181 182
  
  T_REF_INC(pNode);
183
  taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode, sizeof(void *));
H
hzcheng 已提交
184 185 186
  return pNode;
}

187 188 189 190 191
/**
 * do cleanup the taos cache
 * @param pCacheObj
 */
static void doCleanupDataCache(SCacheObj *pCacheObj);
192

H
hzcheng 已提交
193
/**
194
 * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
H
hzcheng 已提交
195 196
 * @param handle   Cache object handle
 */
H
Haojun Liao 已提交
197
static void* taosCacheTimedRefresh(void *handle);
H
hjxilinx 已提交
198

199
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char* cacheName) {
H
Haojun Liao 已提交
200
  if (refreshTimeInSeconds <= 0) {
H
hzcheng 已提交
201 202
    return NULL;
  }
203
  
204 205
  SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
  if (pCacheObj == NULL) {
S
slguan 已提交
206
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
207 208
    return NULL;
  }
209
  
H
Haojun Liao 已提交
210
  pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false);
H
Haojun Liao 已提交
211
  pCacheObj->name = strdup(cacheName);
212 213
  if (pCacheObj->pHashTable == NULL) {
    free(pCacheObj);
S
slguan 已提交
214
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
215 216
    return NULL;
  }
217 218
  
  // set free cache node callback function for hash table
H
Haojun Liao 已提交
219 220 221
  pCacheObj->freeFp = fn;
  pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
  pCacheObj->extendLifespan = extendLifespan;
222

223 224 225
  if (__cache_lock_init(pCacheObj) != 0) {
    taosHashCleanup(pCacheObj->pHashTable);
    free(pCacheObj);
226
    
S
slguan 已提交
227
    uError("failed to init lock, reason:%s", strerror(errno));
H
hzcheng 已提交
228 229
    return NULL;
  }
230

S
Shengliang Guan 已提交
231
  pthread_attr_t thattr;
232 233 234
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

235
  pthread_create(&pCacheObj->refreshWorker, &thattr, taosCacheTimedRefresh, pCacheObj);
236 237

  pthread_attr_destroy(&thattr);
238
  return pCacheObj;
239
}
H
hzcheng 已提交
240

H
Haojun Liao 已提交
241
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) {
242
  if (pCacheObj == NULL || pCacheObj->pHashTable == NULL) {
243 244
    return NULL;
  }
H
Haojun Liao 已提交
245

H
Haojun Liao 已提交
246 247 248 249 250 251
  SCacheDataNode *pNode1 = taosCreateCacheNode(key, keyLen, pData, dataSize, duration);
  if (pNode1 == NULL) {
    uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
    return NULL;
  }

H
Haojun Liao 已提交
252
  __cache_wr_lock(pCacheObj);
253
  SCacheDataNode **pt = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
254 255 256
  SCacheDataNode * pOld = (pt != NULL) ? (*pt) : NULL;
  
  if (pOld == NULL) {  // do addedTime to cache
H
Haojun Liao 已提交
257 258 259 260 261 262 263 264 265 266 267 268
    T_REF_INC(pNode1);
    taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
    __cache_unlock(pCacheObj);

    atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size);
    uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64
           "bytes size:%" PRId64 "bytes",
           pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime,
           (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
  } else {  // old data exists, update the node
    bool addToTrashcan = false;
    if (T_REF_VAL_GET(pOld) > 0) {
H
Haojun Liao 已提交
269

H
Haojun Liao 已提交
270 271 272
      // todo removed by node, instead of by key
      taosHashRemove(pCacheObj->pHashTable, pOld->key, pOld->keySize);
      addToTrashcan = true;
273
    }
274

H
Haojun Liao 已提交
275
    T_REF_INC(pNode1);
276

H
Haojun Liao 已提交
277 278 279 280 281 282 283 284
    // addedTime new element to hashtable
    taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
    __cache_unlock(pCacheObj);

    // todo add trashcan lock
    if (addToTrashcan) {
      taosAddToTrash(pCacheObj, pOld);
    } else {
H
Haojun Liao 已提交
285
      free(pOld);
H
Haojun Liao 已提交
286 287 288 289 290 291
    }

    uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, pOld);
  }

  return pNode1->data;
H
hzcheng 已提交
292 293
}

H
Haojun Liao 已提交
294
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
295
  if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
296
    return NULL;
297
  }
H
Haojun Liao 已提交
298

S
Shengliang Guan 已提交
299 300
  void *pData = NULL;

301 302
  __cache_rd_lock(pCacheObj);
  SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
H
Haojun Liao 已提交
303 304

  int32_t ref = 0;
305
  if (ptNode != NULL) {
H
Haojun Liao 已提交
306
    ref = T_REF_INC(*ptNode);
S
Shengliang Guan 已提交
307
    pData = (*ptNode)->data;
308
  }
S
Shengliang Guan 已提交
309

310
  __cache_unlock(pCacheObj);
S
Shengliang Guan 已提交
311 312

  if (pData != NULL) {
313
    atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
S
Shengliang Guan 已提交
314
    uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, ref);
315
  } else {
316
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
H
Haojun Liao 已提交
317
    uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
318
  }
S
Shengliang Guan 已提交
319

320
  atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
S
Shengliang Guan 已提交
321
  return pData;
322 323
}

324
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t keyLen, uint64_t expireTime) {
S
Shengliang Guan 已提交
325 326 327
  if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
    return NULL;
  }
H
Haojun Liao 已提交
328

S
Shengliang Guan 已提交
329 330 331 332 333
  __cache_rd_lock(pCacheObj);
  
  SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
  if (ptNode != NULL) {
     T_REF_INC(*ptNode);
S
Shengliang Guan 已提交
334
    (*ptNode)->expireTime = expireTime; // taosGetTimestampMs() + (*ptNode)->lifespan;
S
Shengliang Guan 已提交
335
  }
336

S
Shengliang Guan 已提交
337
  __cache_unlock(pCacheObj);
338

S
Shengliang Guan 已提交
339 340
  if (ptNode != NULL) {
    atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
H
Haojun Liao 已提交
341 342
    uDebug("cache:%s, key:%p, %p expireTime is updated in cache, refcnt:%d", pCacheObj->name, key,
        (*ptNode)->data, T_REF_VAL_GET(*ptNode));
S
Shengliang Guan 已提交
343 344
  } else {
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
H
Haojun Liao 已提交
345
    uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
S
Shengliang Guan 已提交
346
  }
347

S
Shengliang Guan 已提交
348 349 350 351
  atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
  return (ptNode != NULL) ? (*ptNode)->data : NULL;
}

352 353
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
  if (pCacheObj == NULL || data == NULL) return NULL;
354
  
355 356
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
357
  
358
  if (ptNode->signature != (uint64_t)ptNode) {
S
slguan 已提交
359
    uError("key: %p the data from cache is invalid", ptNode);
360 361
    return NULL;
  }
362

363
  int32_t ref = T_REF_INC(ptNode);
H
Haojun Liao 已提交
364
  uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
H
Haojun Liao 已提交
365

366 367 368 369 370
  // the data if referenced by at least one object, so the reference count must be greater than the value of 2.
  assert(ref >= 2);
  return data;
}

371
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
H
Haojun Liao 已提交
372
  if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
373
  
374 375
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
376
  
377
  if (ptNode->signature != (uint64_t)ptNode) {
S
slguan 已提交
378
    uError("key: %p the data from cache is invalid", ptNode);
379 380 381
    return NULL;
  }
  
382
  assert(T_REF_VAL_GET(ptNode) >= 1);
383
  
384
  char *d = *data;
385 386 387 388 389
  
  // clear its reference to old area
  *data = NULL;
  
  return d;
H
hzcheng 已提交
390
}
391

392 393
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
  if (pCacheObj == NULL || (*data) == NULL || (taosHashGetSize(pCacheObj->pHashTable) + pCacheObj->numOfElemsInTrash == 0)) {
394 395 396 397 398 399 400
    return;
  }
  
  size_t offset = offsetof(SCacheDataNode, data);
  
  SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
  if (pNode->signature != (uint64_t)pNode) {
401
    uError("%p, release invalid cache data", pNode);
402 403
    return;
  }
404

405
  *data = NULL;
406

407
  // note: extend lifespan before dec ref count
H
Haojun Liao 已提交
408 409
  bool inTrashCan = pNode->inTrashCan;

H
Haojun Liao 已提交
410
  if (pCacheObj->extendLifespan && (!inTrashCan) && (!_remove)) {
411 412 413
    atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
    uDebug("cache:%s data:%p extend life time to %"PRId64 "  before release", pCacheObj->name, pNode->data, pNode->expireTime);
  }
H
Haojun Liao 已提交
414

H
Haojun Liao 已提交
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
  if (_remove) {
    __cache_wr_lock(pCacheObj);

    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
    int32_t ref = T_REF_DEC(pNode);
    uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref);

    /*
     * If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all users
     * releasing this resources.
     *
     * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
     * that tries to do the same thing.
     */
    if (pNode->inTrashCan) {
H
Haojun Liao 已提交
430 431
      __cache_unlock(pCacheObj);

H
Haojun Liao 已提交
432 433 434 435 436
      if (ref == 0) {
        assert(pNode->pTNodeHeader->pData == pNode);
        taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader);
      }
    } else {
H
Haojun Liao 已提交
437 438 439
      taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
      __cache_unlock(pCacheObj);

H
Haojun Liao 已提交
440 441
      if (ref > 0) {
        assert(pNode->pTNodeHeader == NULL);
H
Haojun Liao 已提交
442 443 444

        // todo trashcan lock
        taosAddToTrash(pCacheObj, pNode);
H
Haojun Liao 已提交
445
      } else {
H
Haojun Liao 已提交
446 447 448 449 450 451 452 453
//        taosCacheReleaseNode(pCacheObj, pNode);
        atomic_fetch_sub_ptr(&pCacheObj->totalSize, pNode->size);
        uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes",
               pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize,
               pNode->size);

        if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
        free(pNode);
H
Haojun Liao 已提交
454
      }
H
Haojun Liao 已提交
455 456
    }

457
  } else {
H
Haojun Liao 已提交
458 459
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
    int32_t ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
460 461 462

    uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trancan:%d", pCacheObj->name, pNode->key, pNode->data,
           ref, inTrashCan);
463 464 465
  }
}

466
void taosCacheEmpty(SCacheObj *pCacheObj) {
467 468 469 470 471 472 473 474
  SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
  
  __cache_wr_lock(pCacheObj);
  while (taosHashIterNext(pIter)) {
    if (pCacheObj->deleting == 1) {
      break;
    }
    
475 476
    SCacheDataNode *pNode = *(SCacheDataNode **) taosHashIterGet(pIter);
    if (T_REF_VAL_GET(pNode) == 0) {
H
Haojun Liao 已提交
477 478 479 480
      taosCacheReleaseNode(pCacheObj, pNode);
    } else {
      taosCacheMoveToTrash(pCacheObj, pNode);
    }
481 482 483 484
  }
  __cache_unlock(pCacheObj);
  
  taosHashDestroyIter(pIter);
485
  taosTrashCanEmpty(pCacheObj, false);
486 487 488 489 490 491
}

void taosCacheCleanup(SCacheObj *pCacheObj) {
  if (pCacheObj == NULL) {
    return;
  }
492 493 494 495

  pCacheObj->deleting = 1;
  pthread_join(pCacheObj->refreshWorker, NULL);

H
Haojun Liao 已提交
496
  uInfo("cache:%s will be cleaned up", pCacheObj->name);
497 498 499
  doCleanupDataCache(pCacheObj);
}

H
Haojun Liao 已提交
500
SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration) {
501
  size_t totalSize = size + sizeof(SCacheDataNode) + keyLen;
502 503 504 505 506 507 508 509 510 511 512 513 514 515

  SCacheDataNode *pNewNode = calloc(1, totalSize);
  if (pNewNode == NULL) {
    uError("failed to allocate memory, reason:%s", strerror(errno));
    return NULL;
  }

  memcpy(pNewNode->data, pData, size);

  pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size;
  pNewNode->keySize = keyLen;

  memcpy(pNewNode->key, key, keyLen);

H
Haojun Liao 已提交
516 517
  pNewNode->addedTime    = (uint64_t)taosGetTimestampMs();
  pNewNode->lifespan     = duration;
518
  pNewNode->expireTime   = pNewNode->addedTime + pNewNode->lifespan;
H
Haojun Liao 已提交
519 520
  pNewNode->signature    = (uint64_t)pNewNode;
  pNewNode->size         = (uint32_t)totalSize;
521 522 523 524 525 526

  return pNewNode;
}

void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
  if (pNode->inTrashCan) { /* node is already in trash */
527
    assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
    return;
  }

  STrashElem *pElem = calloc(1, sizeof(STrashElem));
  pElem->pData = pNode;

  pElem->next = pCacheObj->pTrash;
  if (pCacheObj->pTrash) {
    pCacheObj->pTrash->prev = pElem;
  }

  pElem->prev = NULL;
  pCacheObj->pTrash = pElem;

  pNode->inTrashCan = true;
543
  pNode->pTNodeHeader = pElem;
544 545
  pCacheObj->numOfElemsInTrash++;

546
  uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode->data, pCacheObj->numOfElemsInTrash);
547 548 549 550
}

void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) {
  if (pElem->pData->signature != (uint64_t)pElem->pData) {
H
Hui Li 已提交
551
    uError("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
552 553 554 555 556 557 558 559 560 561 562 563 564 565 566
    return;
  }

  pCacheObj->numOfElemsInTrash--;
  if (pElem->prev) {
    pElem->prev->next = pElem->next;
  } else { /* pnode is the header, update header */
    pCacheObj->pTrash = pElem->next;
  }

  if (pElem->next) {
    pElem->next->prev = pElem->prev;
  }

  pElem->pData->signature = 0;
H
Haojun Liao 已提交
567 568 569
  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pElem->pData->data);
  }
H
Haojun Liao 已提交
570

571 572 573 574
  free(pElem->pData);
  free(pElem);
}

H
Haojun Liao 已提交
575
// TODO add another lock when scanning trashcan
576 577 578 579 580 581 582 583 584 585 586
void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
  __cache_wr_lock(pCacheObj);

  if (pCacheObj->numOfElemsInTrash == 0) {
    if (pCacheObj->pTrash != NULL) {
      uError("key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->numOfElemsInTrash);
    }
    pCacheObj->pTrash = NULL;

    __cache_unlock(pCacheObj);
    return;
H
hjxilinx 已提交
587
  }
588 589 590 591 592 593 594 595 596 597

  STrashElem *pElem = pCacheObj->pTrash;

  while (pElem) {
    T_REF_VAL_CHECK(pElem->pData);
    if (pElem->next == pElem) {
      pElem->next = NULL;
    }

    if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
598
      uDebug("key:%p, %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData->data,
599 600 601 602 603 604 605 606 607 608 609 610 611 612 613
             pCacheObj->numOfElemsInTrash - 1);
      STrashElem *p = pElem;

      pElem = pElem->next;
      taosRemoveFromTrashCan(pCacheObj, p);
    } else {
      pElem = pElem->next;
    }
  }

  __cache_unlock(pCacheObj);
}

void doCleanupDataCache(SCacheObj *pCacheObj) {
  __cache_wr_lock(pCacheObj);
614 615 616 617

  SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
  while (taosHashIterNext(pIter)) {
    SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
H
Haojun Liao 已提交
618 619 620

    int32_t c = T_REF_VAL_GET(pNode);
    if (c <= 0) {
621 622
      taosCacheReleaseNode(pCacheObj, pNode);
    } else {
H
Haojun Liao 已提交
623 624
      uDebug("cache:%s key:%p, %p will not remove from cache, refcnt:%d", pCacheObj->name, pNode->key,
          pNode->data, T_REF_VAL_GET(pNode));
625
    }
626 627 628
  }
  taosHashDestroyIter(pIter);

H
Haojun Liao 已提交
629 630
  // todo memory leak if there are object with refcount greater than 0 in hash table?
  taosHashCleanup(pCacheObj->pHashTable);
631 632 633 634
  __cache_unlock(pCacheObj);

  taosTrashCanEmpty(pCacheObj, true);
  __cache_lock_destroy(pCacheObj);
H
Haojun Liao 已提交
635 636
  
  tfree(pCacheObj->name);
637 638
  memset(pCacheObj, 0, sizeof(SCacheObj));
  free(pCacheObj);
639
}
640

641 642 643 644 645 646
static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t fp) {
  SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);

  __cache_wr_lock(pCacheObj);
  while (taosHashIterNext(pIter)) {
    SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
H
Haojun Liao 已提交
647

648
    if (pNode->expireTime < time && T_REF_VAL_GET(pNode) <= 0) {
649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664
      taosCacheReleaseNode(pCacheObj, pNode);
      continue;
    }

    if (fp) {
      fp(pNode->data);
    }
  }

  __cache_unlock(pCacheObj);

  taosHashDestroyIter(pIter);
}

void* taosCacheTimedRefresh(void *handle) {
  SCacheObj* pCacheObj = handle;
665
  if (pCacheObj == NULL) {
666
    uDebug("object is destroyed. no refresh retry");
667 668 669 670 671 672 673 674 675 676 677 678
    return NULL;
  }

  const int32_t SLEEP_DURATION = 500; //500 ms
  int64_t totalTick = pCacheObj->refreshTime / SLEEP_DURATION;

  int64_t count = 0;
  while(1) {
    taosMsleep(500);

    // check if current cache object will be deleted every 500ms.
    if (pCacheObj->deleting) {
679
      uDebug("%s refresh threads quit", pCacheObj->name);
680 681 682 683 684 685 686 687 688
      break;
    }

    if (++count < totalTick) {
      continue;
    }

    // reset the count value
    count = 0;
H
Haojun Liao 已提交
689 690
    size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable);
    if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
691 692 693 694 695
      continue;
    }

    pCacheObj->statistics.refreshCount++;

H
Haojun Liao 已提交
696 697
    // refresh data in hash table
    if (elemInHash > 0) {
698 699
      int64_t now = taosGetTimestampMs();
      doCacheRefresh(pCacheObj, now, NULL);
H
Haojun Liao 已提交
700
    }
701 702 703 704 705

    taosTrashCanEmpty(pCacheObj, false);
  }

  return NULL;
dengyihao's avatar
dengyihao 已提交
706
}
707 708 709 710 711 712 713 714 715

void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp) {
  if (pCacheObj == NULL) {
    return;
  }

  int64_t now = taosGetTimestampMs();
  doCacheRefresh(pCacheObj, now, fp);
}