tcache.c 21.1 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16 17 18
#define _DEFAULT_SOURCE
#include "os.h"
#include "tulog.h"
H
hzcheng 已提交
19 20
#include "ttimer.h"
#include "tutil.h"
S
slguan 已提交
21 22 23
#include "tcache.h"
#include "hash.h"
#include "hashfunc.h"
H
hzcheng 已提交
24

25
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) {
26
#if defined(LINUX)
27
  pthread_rwlock_wrlock(&pCacheObj->lock);
28
#else
29
  pthread_mutex_lock(&pCacheObj->lock);
30 31 32
#endif
}

33
static FORCE_INLINE void __cache_rd_lock(SCacheObj *pCacheObj) {
34
#if defined(LINUX)
35
  pthread_rwlock_rdlock(&pCacheObj->lock);
36
#else
37
  pthread_mutex_lock(&pCacheObj->lock);
38 39 40
#endif
}

41
static FORCE_INLINE void __cache_unlock(SCacheObj *pCacheObj) {
42
#if defined(LINUX)
43
  pthread_rwlock_unlock(&pCacheObj->lock);
44
#else
45
  pthread_mutex_unlock(&pCacheObj->lock);
46 47 48
#endif
}

49
static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pCacheObj) {
50
#if defined(LINUX)
51
  return pthread_rwlock_init(&pCacheObj->lock, NULL);
52
#else
53
  return pthread_mutex_init(&pCacheObj->lock, NULL);
54 55 56
#endif
}

57
static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
58
#if defined(LINUX)
59
  pthread_rwlock_destroy(&pCacheObj->lock);
60
#else
61
  pthread_mutex_destroy(&pCacheObj->lock);
62 63 64
#endif
}

65
#if 0
66 67 68
static FORCE_INLINE void taosFreeNode(void *data) {
  SCacheDataNode *pNode = *(SCacheDataNode **)data;
  free(pNode);
H
hzcheng 已提交
69
}
70
#endif
H
hzcheng 已提交
71 72 73 74 75 76 77

/**
 * @param key      key of object for hash, usually a null-terminated string
 * @param keyLen   length of key
 * @param pData    actually data. required a consecutive memory block, no pointer is allowed
 *                 in pData. Pointer copy causes memory access error.
 * @param size     size of block
78 79
 * @param lifespan total survial expiredTime from now
 * @return         SCacheDataNode
H
hzcheng 已提交
80
 */
81
static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration);
H
hzcheng 已提交
82 83

/**
84
 * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
H
hzcheng 已提交
85
 * It will be removed until the pNode->refCount == 0
86
 * @param pCacheObj    Cache object
H
hzcheng 已提交
87 88
 * @param pNode   Cache slot object
 */
89 90 91 92 93 94 95 96
static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode);

/**
 * remove node in trash can
 * @param pCacheObj 
 * @param pElem 
 */
static void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem);
H
hzcheng 已提交
97 98 99 100

/**
 * remove nodes in trash with refCount == 0 in cache
 * @param pNode
101
 * @param pCacheObj
H
hzcheng 已提交
102 103 104
 * @param force   force model, if true, remove data in trash without check refcount.
 *                may cause corruption. So, forece model only applys before cache is closed
 */
105
static void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force);
H
hzcheng 已提交
106 107 108

/**
 * release node
109
 * @param pCacheObj      cache object
H
hzcheng 已提交
110 111
 * @param pNode     data node
 */
112
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
H
hzcheng 已提交
113
  if (pNode->signature != (uint64_t)pNode) {
S
slguan 已提交
114
    uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
H
hzcheng 已提交
115 116
    return;
  }
117
  
118
  taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
119

120
  pCacheObj->totalSize -= pNode->size;
H
Haojun Liao 已提交
121
  uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes",
122 123 124
         pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize,
         pNode->size);

S
Shengliang Guan 已提交
125
  if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
H
hzcheng 已提交
126 127 128 129 130
  free(pNode);
}

/**
 * move the old node into trash
131
 * @param pCacheObj
H
hzcheng 已提交
132 133
 * @param pNode
 */
134 135 136
static FORCE_INLINE void taosCacheMoveToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
  taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
  taosAddToTrash(pCacheObj, pNode);
H
hzcheng 已提交
137 138 139 140
}

/**
 * update data in cache
141
 * @param pCacheObj
H
hzcheng 已提交
142 143 144 145 146 147 148
 * @param pNode
 * @param key
 * @param keyLen
 * @param pData
 * @param dataSize
 * @return
 */
H
hjxilinx 已提交
149 150
static SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pCacheObj, SCacheDataNode *pNode, const char *key, int32_t keyLen,
                                           const void *pData, uint32_t dataSize, uint64_t duration) {
151 152
  SCacheDataNode *pNewNode = NULL;
  
H
hjxilinx 已提交
153
  // only a node is not referenced by any other object, in-place update it
154
  if (T_REF_VAL_GET(pNode) == 0) {
155
    size_t newSize = sizeof(SCacheDataNode) + dataSize + keyLen + 1;
156 157
    
    pNewNode = (SCacheDataNode *)realloc(pNode, newSize);
H
hzcheng 已提交
158 159 160
    if (pNewNode == NULL) {
      return NULL;
    }
161
    
162
    memset(pNewNode, 0, newSize);
H
hzcheng 已提交
163 164
    pNewNode->signature = (uint64_t)pNewNode;
    memcpy(pNewNode->data, pData, dataSize);
165 166 167 168 169
    
    pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + dataSize;
    pNewNode->keySize = keyLen;
    memcpy(pNewNode->key, key, keyLen);
    
H
hjxilinx 已提交
170
    // update the timestamp information for updated key/value
171
    pNewNode->addedTime = taosGetTimestampMs();
H
Haojun Liao 已提交
172
    pNewNode->lifespan = duration;
173 174 175
    
    T_REF_INC(pNewNode);
    
H
hjxilinx 已提交
176
    // the address of this node may be changed, so the prev and next element should update the corresponding pointer
177
    taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
H
hzcheng 已提交
178
  } else {
179
    taosCacheMoveToTrash(pCacheObj, pNode);
180
    
181
    pNewNode = taosCreateCacheNode(key, keyLen, pData, dataSize, duration);
H
hzcheng 已提交
182 183 184
    if (pNewNode == NULL) {
      return NULL;
    }
185 186 187 188
    
    T_REF_INC(pNewNode);
    
    // addedTime new element to hashtable
189
    taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
H
hzcheng 已提交
190
  }
191
  
H
hzcheng 已提交
192 193 194 195
  return pNewNode;
}

/**
196
 * addedTime data into hash table
H
hzcheng 已提交
197 198 199
 * @param key
 * @param pData
 * @param size
200
 * @param pCacheObj
H
hzcheng 已提交
201 202 203 204
 * @param keyLen
 * @param pNode
 * @return
 */
H
hjxilinx 已提交
205
static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pCacheObj, const char *key, size_t keyLen, const void *pData,
206
                                                       size_t dataSize, uint64_t duration) {
207
  SCacheDataNode *pNode = taosCreateCacheNode(key, keyLen, pData, dataSize, duration);
H
hzcheng 已提交
208 209 210
  if (pNode == NULL) {
    return NULL;
  }
211 212
  
  T_REF_INC(pNode);
213
  taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode, sizeof(void *));
H
hzcheng 已提交
214 215 216
  return pNode;
}

217 218 219 220 221
/**
 * do cleanup the taos cache
 * @param pCacheObj
 */
static void doCleanupDataCache(SCacheObj *pCacheObj);
222

H
hzcheng 已提交
223
/**
224
 * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
H
hzcheng 已提交
225 226
 * @param handle   Cache object handle
 */
H
Haojun Liao 已提交
227
static void* taosCacheTimedRefresh(void *handle);
H
hjxilinx 已提交
228

229
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char* cacheName) {
H
Haojun Liao 已提交
230
  if (refreshTimeInSeconds <= 0) {
H
hzcheng 已提交
231 232
    return NULL;
  }
233
  
234 235
  SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
  if (pCacheObj == NULL) {
S
slguan 已提交
236
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
237 238
    return NULL;
  }
239
  
H
Haojun Liao 已提交
240
  pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false);
H
Haojun Liao 已提交
241
  pCacheObj->name = strdup(cacheName);
242 243
  if (pCacheObj->pHashTable == NULL) {
    free(pCacheObj);
S
slguan 已提交
244
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
245 246
    return NULL;
  }
247 248
  
  // set free cache node callback function for hash table
H
Haojun Liao 已提交
249 250 251
  pCacheObj->freeFp = fn;
  pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
  pCacheObj->extendLifespan = extendLifespan;
252

253 254 255
  if (__cache_lock_init(pCacheObj) != 0) {
    taosHashCleanup(pCacheObj->pHashTable);
    free(pCacheObj);
256
    
S
slguan 已提交
257
    uError("failed to init lock, reason:%s", strerror(errno));
H
hzcheng 已提交
258 259
    return NULL;
  }
260

S
Shengliang Guan 已提交
261
  pthread_attr_t thattr;
262 263 264
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

265
  pthread_create(&pCacheObj->refreshWorker, &thattr, taosCacheTimedRefresh, pCacheObj);
266 267

  pthread_attr_destroy(&thattr);
268
  return pCacheObj;
269
}
H
hzcheng 已提交
270

H
Haojun Liao 已提交
271
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) {
272 273
  SCacheDataNode *pNode;
  
274
  if (pCacheObj == NULL || pCacheObj->pHashTable == NULL) {
275 276
    return NULL;
  }
H
Haojun Liao 已提交
277

278 279
  __cache_wr_lock(pCacheObj);
  SCacheDataNode **pt = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
280 281 282
  SCacheDataNode * pOld = (pt != NULL) ? (*pt) : NULL;
  
  if (pOld == NULL) {  // do addedTime to cache
283
    pNode = taosAddToCacheImpl(pCacheObj, key, keyLen, pData, dataSize, duration * 1000L);
284
    if (NULL != pNode) {
285
      pCacheObj->totalSize += pNode->size;
286

H
Haojun Liao 已提交
287 288
      uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64
             "bytes size:%" PRId64 "bytes",
289
             pCacheObj->name, key, pNode->data, pNode->addedTime, pNode->expireTime,
S
Shengliang Guan 已提交
290
             (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
L
lihui 已提交
291
    } else {
H
Haojun Liao 已提交
292
      uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
293 294
    }
  } else {  // old data exists, update the node
295
    pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L);
S
Shengliang Guan 已提交
296
    uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode->data, pOld);
297
  }
298

299
  __cache_unlock(pCacheObj);
300

301
  return (pNode != NULL) ? pNode->data : NULL;
H
hzcheng 已提交
302 303
}

H
Haojun Liao 已提交
304
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
305
  if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
306
    return NULL;
307
  }
H
Haojun Liao 已提交
308

S
Shengliang Guan 已提交
309 310
  void *pData = NULL;

311
  __cache_rd_lock(pCacheObj);
S
Shengliang Guan 已提交
312

313
  SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
H
Haojun Liao 已提交
314 315

  int32_t ref = 0;
316
  if (ptNode != NULL) {
H
Haojun Liao 已提交
317
    ref = T_REF_INC(*ptNode);
S
Shengliang Guan 已提交
318
    pData = (*ptNode)->data;
319
  }
S
Shengliang Guan 已提交
320

321
  __cache_unlock(pCacheObj);
S
Shengliang Guan 已提交
322 323

  if (pData != NULL) {
324
    atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
S
Shengliang Guan 已提交
325
    uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, ref);
326
  } else {
327
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
H
Haojun Liao 已提交
328
    uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
329
  }
S
Shengliang Guan 已提交
330

331
  atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
S
Shengliang Guan 已提交
332
  return pData;
333 334
}

335
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t keyLen, uint64_t expireTime) {
S
Shengliang Guan 已提交
336 337 338
  if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
    return NULL;
  }
H
Haojun Liao 已提交
339

S
Shengliang Guan 已提交
340 341 342 343 344
  __cache_rd_lock(pCacheObj);
  
  SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
  if (ptNode != NULL) {
     T_REF_INC(*ptNode);
S
Shengliang Guan 已提交
345
    (*ptNode)->expireTime = expireTime; // taosGetTimestampMs() + (*ptNode)->lifespan;
S
Shengliang Guan 已提交
346
  }
347

S
Shengliang Guan 已提交
348
  __cache_unlock(pCacheObj);
349

S
Shengliang Guan 已提交
350 351
  if (ptNode != NULL) {
    atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
H
Haojun Liao 已提交
352 353
    uDebug("cache:%s, key:%p, %p expireTime is updated in cache, refcnt:%d", pCacheObj->name, key,
        (*ptNode)->data, T_REF_VAL_GET(*ptNode));
S
Shengliang Guan 已提交
354 355
  } else {
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
H
Haojun Liao 已提交
356
    uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
S
Shengliang Guan 已提交
357
  }
358

S
Shengliang Guan 已提交
359 360 361 362
  atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
  return (ptNode != NULL) ? (*ptNode)->data : NULL;
}

363 364
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
  if (pCacheObj == NULL || data == NULL) return NULL;
365
  
366 367
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
368
  
369
  if (ptNode->signature != (uint64_t)ptNode) {
S
slguan 已提交
370
    uError("key: %p the data from cache is invalid", ptNode);
371 372
    return NULL;
  }
373

374
  int32_t ref = T_REF_INC(ptNode);
H
Haojun Liao 已提交
375
  uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
H
Haojun Liao 已提交
376

377 378 379 380 381
  // the data if referenced by at least one object, so the reference count must be greater than the value of 2.
  assert(ref >= 2);
  return data;
}

382
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
H
Haojun Liao 已提交
383
  if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
384
  
385 386
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
387
  
388
  if (ptNode->signature != (uint64_t)ptNode) {
S
slguan 已提交
389
    uError("key: %p the data from cache is invalid", ptNode);
390 391 392
    return NULL;
  }
  
393
  assert(T_REF_VAL_GET(ptNode) >= 1);
394
  
395
  char *d = *data;
396 397 398 399 400
  
  // clear its reference to old area
  *data = NULL;
  
  return d;
H
hzcheng 已提交
401
}
402

403 404
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
  if (pCacheObj == NULL || (*data) == NULL || (taosHashGetSize(pCacheObj->pHashTable) + pCacheObj->numOfElemsInTrash == 0)) {
405 406 407 408 409 410 411
    return;
  }
  
  size_t offset = offsetof(SCacheDataNode, data);
  
  SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
  if (pNode->signature != (uint64_t)pNode) {
412
    uError("%p, release invalid cache data", pNode);
413 414
    return;
  }
415

416
  *data = NULL;
417

418
  // note: extend lifespan before dec ref count
H
Haojun Liao 已提交
419 420
  bool inTrashCan = pNode->inTrashCan;

H
Haojun Liao 已提交
421
  if (pCacheObj->extendLifespan && (!inTrashCan) && (!_remove)) {
422 423 424
    atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
    uDebug("cache:%s data:%p extend life time to %"PRId64 "  before release", pCacheObj->name, pNode->data, pNode->expireTime);
  }
H
Haojun Liao 已提交
425

H
Haojun Liao 已提交
426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451
  if (_remove) {
    __cache_wr_lock(pCacheObj);

    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
    int32_t ref = T_REF_DEC(pNode);
    uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref);

    /*
     * If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all users
     * releasing this resources.
     *
     * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
     * that tries to do the same thing.
     */
    if (pNode->inTrashCan) {
      if (ref == 0) {
        assert(pNode->pTNodeHeader->pData == pNode);
        taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader);
      }
    } else {
      if (ref > 0) {
        assert(pNode->pTNodeHeader == NULL);
        taosCacheMoveToTrash(pCacheObj, pNode);
      } else {
        taosCacheReleaseNode(pCacheObj, pNode);
      }
H
Haojun Liao 已提交
452 453
    }

H
Haojun Liao 已提交
454 455
    __cache_unlock(pCacheObj);

456
  } else {
H
Haojun Liao 已提交
457 458
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
    int32_t ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
459 460 461

    uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trancan:%d", pCacheObj->name, pNode->key, pNode->data,
           ref, inTrashCan);
462 463 464
  }
}

465
void taosCacheEmpty(SCacheObj *pCacheObj) {
466 467 468 469 470 471 472 473
  SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
  
  __cache_wr_lock(pCacheObj);
  while (taosHashIterNext(pIter)) {
    if (pCacheObj->deleting == 1) {
      break;
    }
    
474 475
    SCacheDataNode *pNode = *(SCacheDataNode **) taosHashIterGet(pIter);
    if (T_REF_VAL_GET(pNode) == 0) {
H
Haojun Liao 已提交
476 477 478 479
      taosCacheReleaseNode(pCacheObj, pNode);
    } else {
      taosCacheMoveToTrash(pCacheObj, pNode);
    }
480 481 482 483
  }
  __cache_unlock(pCacheObj);
  
  taosHashDestroyIter(pIter);
484
  taosTrashCanEmpty(pCacheObj, false);
485 486 487 488 489 490
}

void taosCacheCleanup(SCacheObj *pCacheObj) {
  if (pCacheObj == NULL) {
    return;
  }
491 492 493 494

  pCacheObj->deleting = 1;
  pthread_join(pCacheObj->refreshWorker, NULL);

H
Haojun Liao 已提交
495
  uInfo("cache:%s will be cleaned up", pCacheObj->name);
496 497 498 499 500
  doCleanupDataCache(pCacheObj);
}

SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
                                           uint64_t duration) {
501
  size_t totalSize = size + sizeof(SCacheDataNode) + keyLen;
502 503 504 505 506 507 508 509 510 511 512 513 514 515

  SCacheDataNode *pNewNode = calloc(1, totalSize);
  if (pNewNode == NULL) {
    uError("failed to allocate memory, reason:%s", strerror(errno));
    return NULL;
  }

  memcpy(pNewNode->data, pData, size);

  pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size;
  pNewNode->keySize = keyLen;

  memcpy(pNewNode->key, key, keyLen);

H
Haojun Liao 已提交
516 517
  pNewNode->addedTime    = (uint64_t)taosGetTimestampMs();
  pNewNode->lifespan     = duration;
518
  pNewNode->expireTime   = pNewNode->addedTime + pNewNode->lifespan;
H
Haojun Liao 已提交
519 520
  pNewNode->signature    = (uint64_t)pNewNode;
  pNewNode->size         = (uint32_t)totalSize;
521 522 523 524 525 526

  return pNewNode;
}

void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
  if (pNode->inTrashCan) { /* node is already in trash */
527
    assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
    return;
  }

  STrashElem *pElem = calloc(1, sizeof(STrashElem));
  pElem->pData = pNode;

  pElem->next = pCacheObj->pTrash;
  if (pCacheObj->pTrash) {
    pCacheObj->pTrash->prev = pElem;
  }

  pElem->prev = NULL;
  pCacheObj->pTrash = pElem;

  pNode->inTrashCan = true;
543
  pNode->pTNodeHeader = pElem;
544 545
  pCacheObj->numOfElemsInTrash++;

546
  uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode->data, pCacheObj->numOfElemsInTrash);
547 548 549 550
}

void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) {
  if (pElem->pData->signature != (uint64_t)pElem->pData) {
H
Hui Li 已提交
551
    uError("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
552 553 554 555 556 557 558 559 560 561 562 563 564 565 566
    return;
  }

  pCacheObj->numOfElemsInTrash--;
  if (pElem->prev) {
    pElem->prev->next = pElem->next;
  } else { /* pnode is the header, update header */
    pCacheObj->pTrash = pElem->next;
  }

  if (pElem->next) {
    pElem->next->prev = pElem->prev;
  }

  pElem->pData->signature = 0;
H
Haojun Liao 已提交
567 568 569
  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pElem->pData->data);
  }
H
Haojun Liao 已提交
570

571 572 573 574
  free(pElem->pData);
  free(pElem);
}

H
Haojun Liao 已提交
575
// TODO add another lock when scanning trashcan
576 577 578 579 580 581 582 583 584 585 586
void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
  __cache_wr_lock(pCacheObj);

  if (pCacheObj->numOfElemsInTrash == 0) {
    if (pCacheObj->pTrash != NULL) {
      uError("key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->numOfElemsInTrash);
    }
    pCacheObj->pTrash = NULL;

    __cache_unlock(pCacheObj);
    return;
H
hjxilinx 已提交
587
  }
588 589 590 591 592 593 594 595 596 597

  STrashElem *pElem = pCacheObj->pTrash;

  while (pElem) {
    T_REF_VAL_CHECK(pElem->pData);
    if (pElem->next == pElem) {
      pElem->next = NULL;
    }

    if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
598
      uDebug("key:%p, %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData->data,
599 600 601 602 603 604 605 606 607 608 609 610 611 612 613
             pCacheObj->numOfElemsInTrash - 1);
      STrashElem *p = pElem;

      pElem = pElem->next;
      taosRemoveFromTrashCan(pCacheObj, p);
    } else {
      pElem = pElem->next;
    }
  }

  __cache_unlock(pCacheObj);
}

void doCleanupDataCache(SCacheObj *pCacheObj) {
  __cache_wr_lock(pCacheObj);
614 615 616 617

  SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
  while (taosHashIterNext(pIter)) {
    SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
H
Haojun Liao 已提交
618 619 620

    int32_t c = T_REF_VAL_GET(pNode);
    if (c <= 0) {
621 622
      taosCacheReleaseNode(pCacheObj, pNode);
    } else {
H
Haojun Liao 已提交
623 624
      uDebug("cache:%s key:%p, %p will not remove from cache, refcnt:%d", pCacheObj->name, pNode->key,
          pNode->data, T_REF_VAL_GET(pNode));
625
    }
626 627 628
  }
  taosHashDestroyIter(pIter);

H
Haojun Liao 已提交
629 630
  // todo memory leak if there are object with refcount greater than 0 in hash table?
  taosHashCleanup(pCacheObj->pHashTable);
631 632 633 634
  __cache_unlock(pCacheObj);

  taosTrashCanEmpty(pCacheObj, true);
  __cache_lock_destroy(pCacheObj);
H
Haojun Liao 已提交
635
  
S
Shengliang Guan 已提交
636
  taosTFree(pCacheObj->name);
637 638
  memset(pCacheObj, 0, sizeof(SCacheObj));
  free(pCacheObj);
639
}
640

641 642 643 644 645 646
static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t fp) {
  SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);

  __cache_wr_lock(pCacheObj);
  while (taosHashIterNext(pIter)) {
    SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
H
Haojun Liao 已提交
647

648
    if (pNode->expireTime < time && T_REF_VAL_GET(pNode) <= 0) {
649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664
      taosCacheReleaseNode(pCacheObj, pNode);
      continue;
    }

    if (fp) {
      fp(pNode->data);
    }
  }

  __cache_unlock(pCacheObj);

  taosHashDestroyIter(pIter);
}

void* taosCacheTimedRefresh(void *handle) {
  SCacheObj* pCacheObj = handle;
665
  if (pCacheObj == NULL) {
666
    uDebug("object is destroyed. no refresh retry");
667 668 669 670 671 672 673 674 675 676 677 678
    return NULL;
  }

  const int32_t SLEEP_DURATION = 500; //500 ms
  int64_t totalTick = pCacheObj->refreshTime / SLEEP_DURATION;

  int64_t count = 0;
  while(1) {
    taosMsleep(500);

    // check if current cache object will be deleted every 500ms.
    if (pCacheObj->deleting) {
679
      uDebug("%s refresh threads quit", pCacheObj->name);
680 681 682 683 684 685 686 687 688
      break;
    }

    if (++count < totalTick) {
      continue;
    }

    // reset the count value
    count = 0;
H
Haojun Liao 已提交
689 690
    size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable);
    if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
691 692 693 694 695
      continue;
    }

    pCacheObj->statistics.refreshCount++;

H
Haojun Liao 已提交
696 697
    // refresh data in hash table
    if (elemInHash > 0) {
698 699
      int64_t now = taosGetTimestampMs();
      doCacheRefresh(pCacheObj, now, NULL);
H
Haojun Liao 已提交
700
    }
701 702 703 704 705

    taosTrashCanEmpty(pCacheObj, false);
  }

  return NULL;
dengyihao's avatar
dengyihao 已提交
706
}
707 708 709 710 711 712 713 714 715

void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp) {
  if (pCacheObj == NULL) {
    return;
  }

  int64_t now = taosGetTimestampMs();
  doCacheRefresh(pCacheObj, now, fp);
}