tcache.c 21.9 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16 17 18
#define _DEFAULT_SOURCE
#include "os.h"
#include "tulog.h"
H
hzcheng 已提交
19 20
#include "ttimer.h"
#include "tutil.h"
S
slguan 已提交
21 22 23
#include "tcache.h"
#include "hash.h"
#include "hashfunc.h"
H
hzcheng 已提交
24

25
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) {
26
#if defined(LINUX)
27
  pthread_rwlock_wrlock(&pCacheObj->lock);
28
#else
29
  pthread_mutex_lock(&pCacheObj->lock);
30 31 32
#endif
}

33
static FORCE_INLINE void __cache_rd_lock(SCacheObj *pCacheObj) {
34
#if defined(LINUX)
35
  pthread_rwlock_rdlock(&pCacheObj->lock);
36
#else
37
  pthread_mutex_lock(&pCacheObj->lock);
38 39 40
#endif
}

41
static FORCE_INLINE void __cache_unlock(SCacheObj *pCacheObj) {
42
#if defined(LINUX)
43
  pthread_rwlock_unlock(&pCacheObj->lock);
44
#else
45
  pthread_mutex_unlock(&pCacheObj->lock);
46 47 48
#endif
}

49
static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pCacheObj) {
50
#if defined(LINUX)
51
  return pthread_rwlock_init(&pCacheObj->lock, NULL);
52
#else
53
  return pthread_mutex_init(&pCacheObj->lock, NULL);
54 55 56
#endif
}

57
static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
58
#if defined(LINUX)
59
  pthread_rwlock_destroy(&pCacheObj->lock);
60
#else
61
  pthread_mutex_destroy(&pCacheObj->lock);
62 63 64
#endif
}

H
hzcheng 已提交
65 66 67 68 69 70
/**
 * @param key      key of object for hash, usually a null-terminated string
 * @param keyLen   length of key
 * @param pData    actually data. required a consecutive memory block, no pointer is allowed
 *                 in pData. Pointer copy causes memory access error.
 * @param size     size of block
71 72
 * @param lifespan total survial expiredTime from now
 * @return         SCacheDataNode
H
hzcheng 已提交
73
 */
74
static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration);
H
hzcheng 已提交
75 76

/**
77
 * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
H
hzcheng 已提交
78
 * It will be removed until the pNode->refCount == 0
79
 * @param pCacheObj    Cache object
H
hzcheng 已提交
80 81
 * @param pNode   Cache slot object
 */
82 83
static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode);

H
hzcheng 已提交
84 85 86
/**
 * remove nodes in trash with refCount == 0 in cache
 * @param pNode
87
 * @param pCacheObj
H
hzcheng 已提交
88 89 90
 * @param force   force model, if true, remove data in trash without check refcount.
 *                may cause corruption. So, forece model only applys before cache is closed
 */
91
static void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force);
H
hzcheng 已提交
92 93 94

/**
 * release node
95
 * @param pCacheObj      cache object
H
Haojun Liao 已提交
96
 * @param pNode          data node
H
hzcheng 已提交
97
 */
98
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
H
hzcheng 已提交
99
  if (pNode->signature != (uint64_t)pNode) {
S
slguan 已提交
100
    uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
H
Haojun Liao 已提交
101
    assert(0);
H
hzcheng 已提交
102 103
    return;
  }
104
  
105
  pCacheObj->totalSize -= pNode->size;
H
Haojun Liao 已提交
106
  int32_t size = taosHashGetSize(pCacheObj->pHashTable);
H
Haojun Liao 已提交
107 108 109 110
  assert(size > 0);

  uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, num:%d size:%" PRId64 "bytes",
         pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize);
H
Haojun Liao 已提交
111 112 113 114

  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pNode->data);
  }
115

H
hzcheng 已提交
116 117 118 119 120
  free(pNode);
}

/**
 * move the old node into trash
121
 * @param pCacheObj
H
hzcheng 已提交
122 123
 * @param pNode
 */
124 125 126
static FORCE_INLINE void taosCacheMoveToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
  taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
  taosAddToTrash(pCacheObj, pNode);
H
hzcheng 已提交
127 128
}

H
Haojun Liao 已提交
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
static FORCE_INLINE void doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem *pElem) {
  if (pElem->pData->signature != (uint64_t) pElem->pData) {
    uError("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
    return;
  }

  pCacheObj->numOfElemsInTrash--;
  if (pElem->prev) {
    pElem->prev->next = pElem->next;
  } else { // pnode is the header, update header
    pCacheObj->pTrash = pElem->next;
  }

  if (pElem->next) {
    pElem->next->prev = pElem->prev;
  }
}

static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem *pElem) {
  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pElem->pData->data);
  }

  free(pElem->pData);
  free(pElem);
}
H
hzcheng 已提交
155 156
/**
 * update data in cache
157
 * @param pCacheObj
H
hzcheng 已提交
158 159 160 161 162 163 164
 * @param pNode
 * @param key
 * @param keyLen
 * @param pData
 * @param dataSize
 * @return
 */
H
Haojun Liao 已提交
165 166 167
static UNUSED_FUNC SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pCacheObj, SCacheDataNode* pNode, SCacheDataNode* pNewNode,
    const char *key, int32_t keyLen) {

H
hjxilinx 已提交
168
  // only a node is not referenced by any other object, in-place update it
H
Haojun Liao 已提交
169
  if (T_REF_VAL_GET(pNode) > 0) {
170
    taosCacheMoveToTrash(pCacheObj, pNode);
H
hzcheng 已提交
171
  }
H
Haojun Liao 已提交
172 173 174 175 176

  T_REF_INC(pNewNode);

  // addedTime new element to hashtable
  taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
H
hzcheng 已提交
177 178 179 180
  return pNewNode;
}

/**
181
 * addedTime data into hash table
H
hzcheng 已提交
182 183 184
 * @param key
 * @param pData
 * @param size
185
 * @param pCacheObj
H
hzcheng 已提交
186 187 188 189
 * @param keyLen
 * @param pNode
 * @return
 */
H
hjxilinx 已提交
190
static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pCacheObj, const char *key, size_t keyLen, const void *pData,
191
                                                       size_t dataSize, uint64_t duration) {
192
  SCacheDataNode *pNode = taosCreateCacheNode(key, keyLen, pData, dataSize, duration);
H
hzcheng 已提交
193 194 195
  if (pNode == NULL) {
    return NULL;
  }
196 197
  
  T_REF_INC(pNode);
198
  taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode, sizeof(void *));
H
hzcheng 已提交
199 200 201
  return pNode;
}

202 203 204 205 206
/**
 * do cleanup the taos cache
 * @param pCacheObj
 */
static void doCleanupDataCache(SCacheObj *pCacheObj);
207

H
hzcheng 已提交
208
/**
209
 * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
H
hzcheng 已提交
210 211
 * @param handle   Cache object handle
 */
H
Haojun Liao 已提交
212
static void* taosCacheTimedRefresh(void *handle);
H
hjxilinx 已提交
213

214
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char* cacheName) {
H
Haojun Liao 已提交
215
  if (refreshTimeInSeconds <= 0) {
H
hzcheng 已提交
216 217
    return NULL;
  }
218
  
219 220
  SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
  if (pCacheObj == NULL) {
S
slguan 已提交
221
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
222 223
    return NULL;
  }
224
  
H
Haojun Liao 已提交
225
  pCacheObj->pHashTable = taosHashInit(4096, taosGetDefaultHashFunction(keyType), false, HASH_ENTRY_LOCK);
H
Haojun Liao 已提交
226
  pCacheObj->name = strdup(cacheName);
227 228
  if (pCacheObj->pHashTable == NULL) {
    free(pCacheObj);
S
slguan 已提交
229
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
230 231
    return NULL;
  }
232 233
  
  // set free cache node callback function for hash table
H
Haojun Liao 已提交
234 235 236
  pCacheObj->freeFp = fn;
  pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
  pCacheObj->extendLifespan = extendLifespan;
237

238 239 240
  if (__cache_lock_init(pCacheObj) != 0) {
    taosHashCleanup(pCacheObj->pHashTable);
    free(pCacheObj);
241
    
S
slguan 已提交
242
    uError("failed to init lock, reason:%s", strerror(errno));
H
hzcheng 已提交
243 244
    return NULL;
  }
245

S
Shengliang Guan 已提交
246
  pthread_attr_t thattr;
247 248 249
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

250
  pthread_create(&pCacheObj->refreshWorker, &thattr, taosCacheTimedRefresh, pCacheObj);
251 252

  pthread_attr_destroy(&thattr);
253
  return pCacheObj;
254
}
H
hzcheng 已提交
255

H
Haojun Liao 已提交
256
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) {
257
  if (pCacheObj == NULL || pCacheObj->pHashTable == NULL) {
258 259
    return NULL;
  }
H
Haojun Liao 已提交
260

H
Haojun Liao 已提交
261 262 263 264 265 266
  SCacheDataNode *pNode1 = taosCreateCacheNode(key, keyLen, pData, dataSize, duration);
  if (pNode1 == NULL) {
    uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
    return NULL;
  }

H
Haojun Liao 已提交
267
  T_REF_INC(pNode1);
H
Haojun Liao 已提交
268

H
Haojun Liao 已提交
269 270
  int32_t succ = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
  if (succ == 0) {
H
Haojun Liao 已提交
271
    atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size);
H
Haojun Liao 已提交
272 273
    uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
           ", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes",
H
Haojun Liao 已提交
274 275
           pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime,
           (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
H
Haojun Liao 已提交
276 277
  } else {  // duplicated key exists
    while (1) {
H
Haojun Liao 已提交
278
      SCacheDataNode* p = NULL;
H
Haojun Liao 已提交
279
      int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, key, keyLen, (void*) &p, sizeof(void*));
H
Haojun Liao 已提交
280 281

      // add to trashcan
H
Haojun Liao 已提交
282 283 284 285 286 287
      if (ret == 0) {
        if (T_REF_VAL_GET(p) == 0) {
          if (pCacheObj->freeFp) {
            pCacheObj->freeFp(p->data);
          }

H
Haojun Liao 已提交
288
          taosTFree(p);
H
Haojun Liao 已提交
289
        } else {
H
Haojun Liao 已提交
290 291
          taosAddToTrash(pCacheObj, p);
          uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, p);
H
Haojun Liao 已提交
292 293
        }
      }
H
Haojun Liao 已提交
294

H
Haojun Liao 已提交
295
      assert(T_REF_VAL_GET(pNode1) == 1);
296

H
Haojun Liao 已提交
297
      ret = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
H
Haojun Liao 已提交
298 299
      if (ret == 0) {
        atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size);
300

H
Haojun Liao 已提交
301 302 303 304
        uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
               ", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes",
               pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime,
               (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
H
Haojun Liao 已提交
305

H
Haojun Liao 已提交
306
        return pNode1->data;
H
Haojun Liao 已提交
307

H
Haojun Liao 已提交
308 309 310 311
      } else {
        // failed, try again
      }
    }
H
Haojun Liao 已提交
312 313 314
  }

  return pNode1->data;
H
hzcheng 已提交
315 316
}

H
Haojun Liao 已提交
317 318 319 320
static void incRefFn(void* ptNode) {
  assert(ptNode != NULL);

  SCacheDataNode** p = (SCacheDataNode**) ptNode;
H
Haojun Liao 已提交
321 322

  assert(T_REF_VAL_GET(*p) >= 0);
H
Haojun Liao 已提交
323 324 325 326
  int32_t ret = T_REF_INC(*p);
  assert(ret > 0);
}

H
Haojun Liao 已提交
327
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
328
  if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
329
    return NULL;
330
  }
H
Haojun Liao 已提交
331

H
Haojun Liao 已提交
332 333
  SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGetCB(pCacheObj->pHashTable, key, keyLen, incRefFn);
  void* pData = (ptNode != NULL)? (*ptNode)->data:NULL;
S
Shengliang Guan 已提交
334 335

  if (pData != NULL) {
336
    atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
H
Haojun Liao 已提交
337
    uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, T_REF_VAL_GET(*ptNode));
338
  } else {
339
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
H
Haojun Liao 已提交
340
    uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
341
  }
S
Shengliang Guan 已提交
342

343
  atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
S
Shengliang Guan 已提交
344
  return pData;
345 346
}

H
Haojun Liao 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
//void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t keyLen, uint64_t expireTime) {
//  if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
//    return NULL;
//  }
//
//  __cache_rd_lock(pCacheObj);
//
//  SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
//  if (ptNode != NULL) {
//     T_REF_INC(*ptNode);
//    (*ptNode)->expireTime = expireTime; // taosGetTimestampMs() + (*ptNode)->lifespan;
//  }
//
//  __cache_unlock(pCacheObj);
//
//  if (ptNode != NULL) {
//    atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
//    uDebug("cache:%s, key:%p, %p expireTime is updated in cache, refcnt:%d", pCacheObj->name, key,
//        (*ptNode)->data, T_REF_VAL_GET(*ptNode));
//  } else {
//    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
//    uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
//  }
//
//  atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
//  return (ptNode != NULL) ? (*ptNode)->data : NULL;
//}
S
Shengliang Guan 已提交
374

375 376
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
  if (pCacheObj == NULL || data == NULL) return NULL;
377
  
378 379
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
380
  
381
  if (ptNode->signature != (uint64_t)ptNode) {
S
slguan 已提交
382
    uError("key: %p the data from cache is invalid", ptNode);
383 384
    return NULL;
  }
385

386
  int32_t ref = T_REF_INC(ptNode);
H
Haojun Liao 已提交
387
  uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
H
Haojun Liao 已提交
388

389 390 391 392 393
  // the data if referenced by at least one object, so the reference count must be greater than the value of 2.
  assert(ref >= 2);
  return data;
}

394
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
H
Haojun Liao 已提交
395
  if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
396
  
397 398
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
399
  
400
  if (ptNode->signature != (uint64_t)ptNode) {
S
slguan 已提交
401
    uError("key: %p the data from cache is invalid", ptNode);
402 403 404
    return NULL;
  }
  
405
  assert(T_REF_VAL_GET(ptNode) >= 1);
406
  
407
  char *d = *data;
408 409 410 411 412
  
  // clear its reference to old area
  *data = NULL;
  
  return d;
H
hzcheng 已提交
413
}
414

415 416
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
  if (pCacheObj == NULL || (*data) == NULL || (taosHashGetSize(pCacheObj->pHashTable) + pCacheObj->numOfElemsInTrash == 0)) {
417 418 419 420 421 422 423
    return;
  }
  
  size_t offset = offsetof(SCacheDataNode, data);
  
  SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
  if (pNode->signature != (uint64_t)pNode) {
424
    uError("%p, release invalid cache data", pNode);
425 426
    return;
  }
427

428
  *data = NULL;
429

430
  // note: extend lifespan before dec ref count
H
Haojun Liao 已提交
431 432
  bool inTrashCan = pNode->inTrashCan;

H
Haojun Liao 已提交
433
  if (pCacheObj->extendLifespan && (!inTrashCan) && (!_remove)) {
434
    atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
H
Haojun Liao 已提交
435
    uDebug("cache:%s data:%p extend expire time: %"PRId64, pCacheObj->name, pNode->data, pNode->expireTime);
436
  }
H
Haojun Liao 已提交
437

H
Haojun Liao 已提交
438 439
  if (_remove) {
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
H
Haojun Liao 已提交
440 441 442
    char* key = pNode->key;
    char* d = pNode->data;

443 444
    int32_t ref = T_REF_VAL_GET(pNode);
    uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, key, d, ref - 1);
H
Haojun Liao 已提交
445 446 447 448 449 450 451 452

    /*
     * If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all users
     * releasing this resources.
     *
     * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
     * that tries to do the same thing.
     */
H
Haojun Liao 已提交
453
    if (inTrashCan) {
454 455
      ref = T_REF_DEC(pNode);

H
Haojun Liao 已提交
456 457
      if (ref == 0) {
        assert(pNode->pTNodeHeader->pData == pNode);
H
Haojun Liao 已提交
458

H
Haojun Liao 已提交
459 460 461 462 463
        __cache_wr_lock(pCacheObj);
        doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
        __cache_unlock(pCacheObj);

        doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
H
Haojun Liao 已提交
464 465
      }
    } else {
466 467
      // NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
      // when reaches here.
H
Haojun Liao 已提交
468
      int32_t ret = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
469
      ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
470 471 472 473

      // successfully remove from hash table, if failed, this node must have been move to trash already, do nothing.
      // note that the remove operation can be executed only once.
      if (ret == 0) {
H
Haojun Liao 已提交
474 475
        if (ref > 0) {
          assert(pNode->pTNodeHeader == NULL);
H
Haojun Liao 已提交
476

H
Haojun Liao 已提交
477
          __cache_wr_lock(pCacheObj);
H
Haojun Liao 已提交
478
          taosAddToTrash(pCacheObj, pNode);
H
Haojun Liao 已提交
479
          __cache_unlock(pCacheObj);
H
Haojun Liao 已提交
480
        } else {  // ref == 0
H
Haojun Liao 已提交
481 482
          atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size);

H
Haojun Liao 已提交
483 484 485
          int32_t size = taosHashGetSize(pCacheObj->pHashTable);
          uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, num:%d size:%" PRId64 "bytes",
                 pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->totalSize);
H
Haojun Liao 已提交
486

H
Haojun Liao 已提交
487 488 489 490
          if (pCacheObj->freeFp) {
            pCacheObj->freeFp(pNode->data);
          }

H
Haojun Liao 已提交
491
          free(pNode);
H
Haojun Liao 已提交
492
        }
H
Haojun Liao 已提交
493
      }
H
Haojun Liao 已提交
494 495
    }

496
  } else {
H
Haojun Liao 已提交
497
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
H
Haojun Liao 已提交
498 499 500
    char* key = pNode->key;
    char* p = pNode->data;

H
Haojun Liao 已提交
501
    int32_t ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
502 503 504
    uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trancan:%d", pCacheObj->name, key, p, ref, inTrashCan);
  }
}
H
Haojun Liao 已提交
505

H
Haojun Liao 已提交
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521
typedef struct SHashTravSupp {
  SCacheObj* pCacheObj;
  int64_t    time;
  __cache_free_fn_t fp;
} SHashTravSupp;

static bool travHashTableEmptyFn(void* param, void* data) {
  SHashTravSupp* ps = (SHashTravSupp*) param;
  SCacheObj* pCacheObj= ps->pCacheObj;

  SCacheDataNode *pNode = *(SCacheDataNode **) data;

  if (T_REF_VAL_GET(pNode) == 0) {
    taosCacheReleaseNode(pCacheObj, pNode);
  } else { // do add to trashcan
    taosAddToTrash(pCacheObj, pNode);
522
  }
H
Haojun Liao 已提交
523 524 525

  // this node should be remove from hash table
  return false;
526 527
}

528
void taosCacheEmpty(SCacheObj *pCacheObj) {
H
Haojun Liao 已提交
529 530 531
  SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};

  taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
532
  taosTrashCanEmpty(pCacheObj, false);
533 534 535 536 537 538
}

void taosCacheCleanup(SCacheObj *pCacheObj) {
  if (pCacheObj == NULL) {
    return;
  }
539 540 541 542

  pCacheObj->deleting = 1;
  pthread_join(pCacheObj->refreshWorker, NULL);

H
Haojun Liao 已提交
543
  uInfo("cache:%s will be cleaned up", pCacheObj->name);
544 545 546
  doCleanupDataCache(pCacheObj);
}

H
Haojun Liao 已提交
547
SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration) {
548
  size_t totalSize = size + sizeof(SCacheDataNode) + keyLen;
549 550 551 552 553 554 555 556 557 558 559 560 561 562

  SCacheDataNode *pNewNode = calloc(1, totalSize);
  if (pNewNode == NULL) {
    uError("failed to allocate memory, reason:%s", strerror(errno));
    return NULL;
  }

  memcpy(pNewNode->data, pData, size);

  pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size;
  pNewNode->keySize = keyLen;

  memcpy(pNewNode->key, key, keyLen);

H
Haojun Liao 已提交
563 564
  pNewNode->addedTime    = (uint64_t)taosGetTimestampMs();
  pNewNode->lifespan     = duration;
565
  pNewNode->expireTime   = pNewNode->addedTime + pNewNode->lifespan;
H
Haojun Liao 已提交
566 567
  pNewNode->signature    = (uint64_t)pNewNode;
  pNewNode->size         = (uint32_t)totalSize;
568 569 570 571 572 573

  return pNewNode;
}

void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
  if (pNode->inTrashCan) { /* node is already in trash */
574
    assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
575 576 577 578 579 580 581 582 583 584 585 586 587 588 589
    return;
  }

  STrashElem *pElem = calloc(1, sizeof(STrashElem));
  pElem->pData = pNode;

  pElem->next = pCacheObj->pTrash;
  if (pCacheObj->pTrash) {
    pCacheObj->pTrash->prev = pElem;
  }

  pElem->prev = NULL;
  pCacheObj->pTrash = pElem;

  pNode->inTrashCan = true;
590
  pNode->pTNodeHeader = pElem;
591 592
  pCacheObj->numOfElemsInTrash++;

H
Haojun Liao 已提交
593 594
  uDebug("%s key:%p, %p move to trash, numOfElem in trash:%d", pCacheObj->name, pNode->key, pNode->data,
      pCacheObj->numOfElemsInTrash);
595 596 597 598 599 600 601 602 603 604
}

void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
  __cache_wr_lock(pCacheObj);

  if (pCacheObj->numOfElemsInTrash == 0) {
    if (pCacheObj->pTrash != NULL) {
      uError("key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->numOfElemsInTrash);
    }

H
Haojun Liao 已提交
605
    pCacheObj->pTrash = NULL;
606 607
    __cache_unlock(pCacheObj);
    return;
H
hjxilinx 已提交
608
  }
609 610 611 612 613 614 615 616 617 618

  STrashElem *pElem = pCacheObj->pTrash;

  while (pElem) {
    T_REF_VAL_CHECK(pElem->pData);
    if (pElem->next == pElem) {
      pElem->next = NULL;
    }

    if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
619
      uDebug("key:%p, %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData->data,
620 621
             pCacheObj->numOfElemsInTrash - 1);

H
Haojun Liao 已提交
622
      STrashElem *p = pElem;
623
      pElem = pElem->next;
H
Haojun Liao 已提交
624 625 626

      doRemoveElemInTrashcan(pCacheObj, p);
      doDestroyTrashcanElem(pCacheObj, p);
627 628 629 630 631 632 633 634 635
    } else {
      pElem = pElem->next;
    }
  }

  __cache_unlock(pCacheObj);
}

void doCleanupDataCache(SCacheObj *pCacheObj) {
H
Haojun Liao 已提交
636 637
  SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
  taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
638

H
Haojun Liao 已提交
639 640
  // todo memory leak if there are object with refcount greater than 0 in hash table?
  taosHashCleanup(pCacheObj->pHashTable);
641
  taosTrashCanEmpty(pCacheObj, true);
H
Haojun Liao 已提交
642

643
  __cache_lock_destroy(pCacheObj);
H
Haojun Liao 已提交
644
  
S
Shengliang Guan 已提交
645
  taosTFree(pCacheObj->name);
646 647
  memset(pCacheObj, 0, sizeof(SCacheObj));
  free(pCacheObj);
648
}
649

H
Haojun Liao 已提交
650 651 652
bool travHashTableFn(void* param, void* data) {
  SHashTravSupp* ps = (SHashTravSupp*) param;
  SCacheObj*     pCacheObj= ps->pCacheObj;
653

H
Haojun Liao 已提交
654 655 656
  SCacheDataNode* pNode = *(SCacheDataNode **) data;
  if (pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) {
    taosCacheReleaseNode(pCacheObj, pNode);
H
Haojun Liao 已提交
657

H
Haojun Liao 已提交
658 659 660
    // this node should be remove from hash table
    return false;
  }
661

H
Haojun Liao 已提交
662 663
  if (ps->fp) {
    (ps->fp)(pNode->data);
664 665
  }

H
Haojun Liao 已提交
666 667 668 669 670 671
  // do not remove element in hash table
  return true;
}

static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t fp) {
  assert(pCacheObj != NULL);
672

H
Haojun Liao 已提交
673 674
  SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time};
  taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup);
675 676 677 678
}

void* taosCacheTimedRefresh(void *handle) {
  SCacheObj* pCacheObj = handle;
679
  if (pCacheObj == NULL) {
680
    uDebug("object is destroyed. no refresh retry");
681 682 683 684 685 686 687 688 689 690 691 692
    return NULL;
  }

  const int32_t SLEEP_DURATION = 500; //500 ms
  int64_t totalTick = pCacheObj->refreshTime / SLEEP_DURATION;

  int64_t count = 0;
  while(1) {
    taosMsleep(500);

    // check if current cache object will be deleted every 500ms.
    if (pCacheObj->deleting) {
693
      uDebug("%s refresh threads quit", pCacheObj->name);
694 695 696 697 698 699 700 701 702
      break;
    }

    if (++count < totalTick) {
      continue;
    }

    // reset the count value
    count = 0;
H
Haojun Liao 已提交
703 704
    size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable);
    if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
705 706 707 708 709
      continue;
    }

    pCacheObj->statistics.refreshCount++;

H
Haojun Liao 已提交
710 711
    // refresh data in hash table
    if (elemInHash > 0) {
712 713
      int64_t now = taosGetTimestampMs();
      doCacheRefresh(pCacheObj, now, NULL);
H
Haojun Liao 已提交
714
    }
715 716 717 718 719

    taosTrashCanEmpty(pCacheObj, false);
  }

  return NULL;
dengyihao's avatar
dengyihao 已提交
720
}
721 722 723 724 725 726 727 728 729

void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp) {
  if (pCacheObj == NULL) {
    return;
  }

  int64_t now = taosGetTimestampMs();
  doCacheRefresh(pCacheObj, now, fp);
}