tcache.c 21.1 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16 17 18
#define _DEFAULT_SOURCE
#include "os.h"
#include "tulog.h"
H
hzcheng 已提交
19 20 21
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
S
slguan 已提交
22 23 24
#include "tcache.h"
#include "hash.h"
#include "hashfunc.h"
H
hzcheng 已提交
25

26
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) {
27
#if defined(LINUX)
28
  pthread_rwlock_wrlock(&pCacheObj->lock);
29
#else
30
  pthread_mutex_lock(&pCacheObj->lock);
31 32 33
#endif
}

34
static FORCE_INLINE void __cache_rd_lock(SCacheObj *pCacheObj) {
35
#if defined(LINUX)
36
  pthread_rwlock_rdlock(&pCacheObj->lock);
37
#else
38
  pthread_mutex_lock(&pCacheObj->lock);
39 40 41
#endif
}

42
static FORCE_INLINE void __cache_unlock(SCacheObj *pCacheObj) {
43
#if defined(LINUX)
44
  pthread_rwlock_unlock(&pCacheObj->lock);
45
#else
46
  pthread_mutex_unlock(&pCacheObj->lock);
47 48 49
#endif
}

50
static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pCacheObj) {
51
#if defined(LINUX)
52
  return pthread_rwlock_init(&pCacheObj->lock, NULL);
53
#else
54
  return pthread_mutex_init(&pCacheObj->lock, NULL);
55 56 57
#endif
}

58
static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
59
#if defined(LINUX)
60
  pthread_rwlock_destroy(&pCacheObj->lock);
61
#else
62
  pthread_mutex_destroy(&pCacheObj->lock);
63 64 65
#endif
}

66
#if 0
67 68 69
static FORCE_INLINE void taosFreeNode(void *data) {
  SCacheDataNode *pNode = *(SCacheDataNode **)data;
  free(pNode);
H
hzcheng 已提交
70
}
71
#endif
H
hzcheng 已提交
72 73 74 75 76 77 78

/**
 * @param key      key of object for hash, usually a null-terminated string
 * @param keyLen   length of key
 * @param pData    actually data. required a consecutive memory block, no pointer is allowed
 *                 in pData. Pointer copy causes memory access error.
 * @param size     size of block
79 80
 * @param lifespan total survial expiredTime from now
 * @return         SCacheDataNode
H
hzcheng 已提交
81
 */
82
static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration);
H
hzcheng 已提交
83 84

/**
85
 * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
H
hzcheng 已提交
86
 * It will be removed until the pNode->refCount == 0
87
 * @param pCacheObj    Cache object
H
hzcheng 已提交
88 89
 * @param pNode   Cache slot object
 */
90 91 92 93 94 95 96 97
static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode);

/**
 * remove node in trash can
 * @param pCacheObj 
 * @param pElem 
 */
static void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem);
H
hzcheng 已提交
98 99 100 101

/**
 * remove nodes in trash with refCount == 0 in cache
 * @param pNode
102
 * @param pCacheObj
H
hzcheng 已提交
103 104 105
 * @param force   force model, if true, remove data in trash without check refcount.
 *                may cause corruption. So, forece model only applys before cache is closed
 */
106
static void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force);
H
hzcheng 已提交
107 108 109

/**
 * release node
110
 * @param pCacheObj      cache object
H
hzcheng 已提交
111 112
 * @param pNode     data node
 */
113
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
H
hzcheng 已提交
114
  if (pNode->signature != (uint64_t)pNode) {
S
slguan 已提交
115
    uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
H
hzcheng 已提交
116 117
    return;
  }
118
  
119
  taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
120

121
  pCacheObj->totalSize -= pNode->size;
H
Haojun Liao 已提交
122
  uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes",
123 124 125
         pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize,
         pNode->size);

S
Shengliang Guan 已提交
126
  if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
H
hzcheng 已提交
127 128 129 130 131
  free(pNode);
}

/**
 * move the old node into trash
132
 * @param pCacheObj
H
hzcheng 已提交
133 134
 * @param pNode
 */
135 136 137
static FORCE_INLINE void taosCacheMoveToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
  taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
  taosAddToTrash(pCacheObj, pNode);
H
hzcheng 已提交
138 139 140 141
}

/**
 * update data in cache
142
 * @param pCacheObj
H
hzcheng 已提交
143 144 145 146 147 148 149
 * @param pNode
 * @param key
 * @param keyLen
 * @param pData
 * @param dataSize
 * @return
 */
H
hjxilinx 已提交
150 151
static SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pCacheObj, SCacheDataNode *pNode, const char *key, int32_t keyLen,
                                           const void *pData, uint32_t dataSize, uint64_t duration) {
152 153
  SCacheDataNode *pNewNode = NULL;
  
H
hjxilinx 已提交
154
  // only a node is not referenced by any other object, in-place update it
155
  if (T_REF_VAL_GET(pNode) == 0) {
156
    size_t newSize = sizeof(SCacheDataNode) + dataSize + keyLen + 1;
157 158
    
    pNewNode = (SCacheDataNode *)realloc(pNode, newSize);
H
hzcheng 已提交
159 160 161
    if (pNewNode == NULL) {
      return NULL;
    }
162
    
163
    memset(pNewNode, 0, newSize);
H
hzcheng 已提交
164 165
    pNewNode->signature = (uint64_t)pNewNode;
    memcpy(pNewNode->data, pData, dataSize);
166 167 168 169 170
    
    pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + dataSize;
    pNewNode->keySize = keyLen;
    memcpy(pNewNode->key, key, keyLen);
    
H
hjxilinx 已提交
171
    // update the timestamp information for updated key/value
172
    pNewNode->addedTime = taosGetTimestampMs();
H
Haojun Liao 已提交
173
    pNewNode->lifespan = duration;
174 175 176
    
    T_REF_INC(pNewNode);
    
H
hjxilinx 已提交
177
    // the address of this node may be changed, so the prev and next element should update the corresponding pointer
178
    taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
H
hzcheng 已提交
179
  } else {
180
    taosCacheMoveToTrash(pCacheObj, pNode);
181
    
182
    pNewNode = taosCreateCacheNode(key, keyLen, pData, dataSize, duration);
H
hzcheng 已提交
183 184 185
    if (pNewNode == NULL) {
      return NULL;
    }
186 187 188 189
    
    T_REF_INC(pNewNode);
    
    // addedTime new element to hashtable
190
    taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
H
hzcheng 已提交
191
  }
192
  
H
hzcheng 已提交
193 194 195 196
  return pNewNode;
}

/**
197
 * addedTime data into hash table
H
hzcheng 已提交
198 199 200
 * @param key
 * @param pData
 * @param size
201
 * @param pCacheObj
H
hzcheng 已提交
202 203 204 205
 * @param keyLen
 * @param pNode
 * @return
 */
H
hjxilinx 已提交
206
static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pCacheObj, const char *key, size_t keyLen, const void *pData,
207
                                                       size_t dataSize, uint64_t duration) {
208
  SCacheDataNode *pNode = taosCreateCacheNode(key, keyLen, pData, dataSize, duration);
H
hzcheng 已提交
209 210 211
  if (pNode == NULL) {
    return NULL;
  }
212 213
  
  T_REF_INC(pNode);
214
  taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode, sizeof(void *));
H
hzcheng 已提交
215 216 217
  return pNode;
}

218 219 220 221 222
/**
 * do cleanup the taos cache
 * @param pCacheObj
 */
static void doCleanupDataCache(SCacheObj *pCacheObj);
223

H
hzcheng 已提交
224
/**
225
 * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
H
hzcheng 已提交
226 227
 * @param handle   Cache object handle
 */
H
Haojun Liao 已提交
228
static void* taosCacheTimedRefresh(void *handle);
H
hjxilinx 已提交
229

230
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char* cacheName) {
H
Haojun Liao 已提交
231
  if (refreshTimeInSeconds <= 0) {
H
hzcheng 已提交
232 233
    return NULL;
  }
234
  
235 236
  SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
  if (pCacheObj == NULL) {
S
slguan 已提交
237
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
238 239
    return NULL;
  }
240
  
H
Haojun Liao 已提交
241
  pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false);
H
Haojun Liao 已提交
242
  pCacheObj->name = strdup(cacheName);
243 244
  if (pCacheObj->pHashTable == NULL) {
    free(pCacheObj);
S
slguan 已提交
245
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
246 247
    return NULL;
  }
248 249
  
  // set free cache node callback function for hash table
H
Haojun Liao 已提交
250 251 252
  pCacheObj->freeFp = fn;
  pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
  pCacheObj->extendLifespan = extendLifespan;
253

254 255 256
  if (__cache_lock_init(pCacheObj) != 0) {
    taosHashCleanup(pCacheObj->pHashTable);
    free(pCacheObj);
257
    
S
slguan 已提交
258
    uError("failed to init lock, reason:%s", strerror(errno));
H
hzcheng 已提交
259 260
    return NULL;
  }
261

H
Haojun Liao 已提交
262
  pthread_attr_t thattr = {{0}};
263 264 265
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

266
  pthread_create(&pCacheObj->refreshWorker, &thattr, taosCacheTimedRefresh, pCacheObj);
267 268

  pthread_attr_destroy(&thattr);
269
  return pCacheObj;
270
}
H
hzcheng 已提交
271

H
Haojun Liao 已提交
272
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) {
273 274
  SCacheDataNode *pNode;
  
275
  if (pCacheObj == NULL || pCacheObj->pHashTable == NULL) {
276 277
    return NULL;
  }
H
Haojun Liao 已提交
278

279 280
  __cache_wr_lock(pCacheObj);
  SCacheDataNode **pt = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
281 282 283
  SCacheDataNode * pOld = (pt != NULL) ? (*pt) : NULL;
  
  if (pOld == NULL) {  // do addedTime to cache
284
    pNode = taosAddToCacheImpl(pCacheObj, key, keyLen, pData, dataSize, duration * 1000L);
285
    if (NULL != pNode) {
286
      pCacheObj->totalSize += pNode->size;
287

H
Haojun Liao 已提交
288 289
      uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64
             "bytes size:%" PRId64 "bytes",
290
             pCacheObj->name, key, pNode->data, pNode->addedTime, pNode->expireTime,
H
Haojun Liao 已提交
291
             (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, dataSize);
L
lihui 已提交
292
    } else {
H
Haojun Liao 已提交
293
      uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
294 295
    }
  } else {  // old data exists, update the node
296
    pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L);
S
Shengliang Guan 已提交
297
    uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode->data, pOld);
298
  }
299

300
  __cache_unlock(pCacheObj);
301

302
  return (pNode != NULL) ? pNode->data : NULL;
H
hzcheng 已提交
303 304
}

H
Haojun Liao 已提交
305
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
306
  if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
307
    return NULL;
308
  }
H
Haojun Liao 已提交
309

S
Shengliang Guan 已提交
310 311
  void *pData = NULL;

312
  __cache_rd_lock(pCacheObj);
S
Shengliang Guan 已提交
313

314
  SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
H
Haojun Liao 已提交
315 316

  int32_t ref = 0;
317
  if (ptNode != NULL) {
H
Haojun Liao 已提交
318
    ref = T_REF_INC(*ptNode);
S
Shengliang Guan 已提交
319
    pData = (*ptNode)->data;
320
  }
S
Shengliang Guan 已提交
321

322
  __cache_unlock(pCacheObj);
S
Shengliang Guan 已提交
323 324

  if (pData != NULL) {
325
    atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
S
Shengliang Guan 已提交
326
    uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, ref);
327
  } else {
328
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
H
Haojun Liao 已提交
329
    uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
330
  }
S
Shengliang Guan 已提交
331

332
  atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
S
Shengliang Guan 已提交
333
  return pData;
334 335
}

336
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t keyLen, uint64_t expireTime) {
S
Shengliang Guan 已提交
337 338 339
  if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
    return NULL;
  }
H
Haojun Liao 已提交
340

S
Shengliang Guan 已提交
341 342 343 344 345
  __cache_rd_lock(pCacheObj);
  
  SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
  if (ptNode != NULL) {
     T_REF_INC(*ptNode);
S
Shengliang Guan 已提交
346
    (*ptNode)->expireTime = expireTime; // taosGetTimestampMs() + (*ptNode)->lifespan;
S
Shengliang Guan 已提交
347
  }
348

S
Shengliang Guan 已提交
349
  __cache_unlock(pCacheObj);
350

S
Shengliang Guan 已提交
351 352
  if (ptNode != NULL) {
    atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
H
Haojun Liao 已提交
353 354
    uDebug("cache:%s, key:%p, %p expireTime is updated in cache, refcnt:%d", pCacheObj->name, key,
        (*ptNode)->data, T_REF_VAL_GET(*ptNode));
S
Shengliang Guan 已提交
355 356
  } else {
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
H
Haojun Liao 已提交
357
    uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
S
Shengliang Guan 已提交
358
  }
359

S
Shengliang Guan 已提交
360 361 362 363
  atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
  return (ptNode != NULL) ? (*ptNode)->data : NULL;
}

364 365
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
  if (pCacheObj == NULL || data == NULL) return NULL;
366
  
367 368
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
369
  
370
  if (ptNode->signature != (uint64_t)ptNode) {
S
slguan 已提交
371
    uError("key: %p the data from cache is invalid", ptNode);
372 373
    return NULL;
  }
374

375
  int32_t ref = T_REF_INC(ptNode);
H
Haojun Liao 已提交
376
  uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
H
Haojun Liao 已提交
377

378 379 380 381 382
  // the data if referenced by at least one object, so the reference count must be greater than the value of 2.
  assert(ref >= 2);
  return data;
}

383
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
H
Haojun Liao 已提交
384
  if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
385
  
386 387
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
388
  
389
  if (ptNode->signature != (uint64_t)ptNode) {
S
slguan 已提交
390
    uError("key: %p the data from cache is invalid", ptNode);
391 392 393
    return NULL;
  }
  
394
  assert(T_REF_VAL_GET(ptNode) >= 1);
395
  
396
  char *d = *data;
397 398 399 400 401
  
  // clear its reference to old area
  *data = NULL;
  
  return d;
H
hzcheng 已提交
402
}
403

404 405
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
  if (pCacheObj == NULL || (*data) == NULL || (taosHashGetSize(pCacheObj->pHashTable) + pCacheObj->numOfElemsInTrash == 0)) {
406 407 408 409 410 411 412
    return;
  }
  
  size_t offset = offsetof(SCacheDataNode, data);
  
  SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
  if (pNode->signature != (uint64_t)pNode) {
413
    uError("%p, release invalid cache data", pNode);
414 415
    return;
  }
416

417
  *data = NULL;
418

419
  // note: extend lifespan before dec ref count
H
Haojun Liao 已提交
420 421
  bool inTrashCan = pNode->inTrashCan;

H
Haojun Liao 已提交
422
  if (pCacheObj->extendLifespan && (!inTrashCan) && (!_remove)) {
423 424 425
    atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
    uDebug("cache:%s data:%p extend life time to %"PRId64 "  before release", pCacheObj->name, pNode->data, pNode->expireTime);
  }
H
Haojun Liao 已提交
426

H
Haojun Liao 已提交
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452
  if (_remove) {
    __cache_wr_lock(pCacheObj);

    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
    int32_t ref = T_REF_DEC(pNode);
    uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref);

    /*
     * If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all users
     * releasing this resources.
     *
     * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
     * that tries to do the same thing.
     */
    if (pNode->inTrashCan) {
      if (ref == 0) {
        assert(pNode->pTNodeHeader->pData == pNode);
        taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader);
      }
    } else {
      if (ref > 0) {
        assert(pNode->pTNodeHeader == NULL);
        taosCacheMoveToTrash(pCacheObj, pNode);
      } else {
        taosCacheReleaseNode(pCacheObj, pNode);
      }
H
Haojun Liao 已提交
453 454
    }

H
Haojun Liao 已提交
455 456
    __cache_unlock(pCacheObj);

457
  } else {
H
Haojun Liao 已提交
458 459
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
    int32_t ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
460 461
    uDebug("cache:%s, key:%p, %p is released, refcnt:%d, in trashcan:%d", pCacheObj->name, pNode->key, pNode->data, ref,
        inTrashCan);
462 463 464
  }
}

465
void taosCacheEmpty(SCacheObj *pCacheObj) {
466 467 468 469 470 471 472 473
  SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
  
  __cache_wr_lock(pCacheObj);
  while (taosHashIterNext(pIter)) {
    if (pCacheObj->deleting == 1) {
      break;
    }
    
474 475
    SCacheDataNode *pNode = *(SCacheDataNode **) taosHashIterGet(pIter);
    if (T_REF_VAL_GET(pNode) == 0) {
H
Haojun Liao 已提交
476 477 478 479
      taosCacheReleaseNode(pCacheObj, pNode);
    } else {
      taosCacheMoveToTrash(pCacheObj, pNode);
    }
480 481 482 483
  }
  __cache_unlock(pCacheObj);
  
  taosHashDestroyIter(pIter);
484
  taosTrashCanEmpty(pCacheObj, false);
485 486 487 488 489 490
}

void taosCacheCleanup(SCacheObj *pCacheObj) {
  if (pCacheObj == NULL) {
    return;
  }
491 492 493 494

  pCacheObj->deleting = 1;
  pthread_join(pCacheObj->refreshWorker, NULL);

H
Haojun Liao 已提交
495
  uInfo("cache:%s will be cleaned up", pCacheObj->name);
496 497 498 499 500
  doCleanupDataCache(pCacheObj);
}

SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
                                           uint64_t duration) {
501
  size_t totalSize = size + sizeof(SCacheDataNode) + keyLen;
502 503 504 505 506 507 508 509 510 511 512 513 514 515

  SCacheDataNode *pNewNode = calloc(1, totalSize);
  if (pNewNode == NULL) {
    uError("failed to allocate memory, reason:%s", strerror(errno));
    return NULL;
  }

  memcpy(pNewNode->data, pData, size);

  pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size;
  pNewNode->keySize = keyLen;

  memcpy(pNewNode->key, key, keyLen);

H
Haojun Liao 已提交
516 517
  pNewNode->addedTime    = (uint64_t)taosGetTimestampMs();
  pNewNode->lifespan     = duration;
518
  pNewNode->expireTime   = pNewNode->addedTime + pNewNode->lifespan;
H
Haojun Liao 已提交
519 520
  pNewNode->signature    = (uint64_t)pNewNode;
  pNewNode->size         = (uint32_t)totalSize;
521 522 523 524 525 526

  return pNewNode;
}

void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
  if (pNode->inTrashCan) { /* node is already in trash */
527
    assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
    return;
  }

  STrashElem *pElem = calloc(1, sizeof(STrashElem));
  pElem->pData = pNode;

  pElem->next = pCacheObj->pTrash;
  if (pCacheObj->pTrash) {
    pCacheObj->pTrash->prev = pElem;
  }

  pElem->prev = NULL;
  pCacheObj->pTrash = pElem;

  pNode->inTrashCan = true;
543
  pNode->pTNodeHeader = pElem;
544 545
  pCacheObj->numOfElemsInTrash++;

546
  uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode->data, pCacheObj->numOfElemsInTrash);
547 548 549 550
}

void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) {
  if (pElem->pData->signature != (uint64_t)pElem->pData) {
H
Hui Li 已提交
551
    uError("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
552 553 554 555 556 557 558 559 560 561 562 563 564 565 566
    return;
  }

  pCacheObj->numOfElemsInTrash--;
  if (pElem->prev) {
    pElem->prev->next = pElem->next;
  } else { /* pnode is the header, update header */
    pCacheObj->pTrash = pElem->next;
  }

  if (pElem->next) {
    pElem->next->prev = pElem->prev;
  }

  pElem->pData->signature = 0;
H
Haojun Liao 已提交
567 568 569
  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pElem->pData->data);
  }
H
Haojun Liao 已提交
570

571 572 573 574 575 576 577 578 579 580 581 582 583 584 585
  free(pElem->pData);
  free(pElem);
}

void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
  __cache_wr_lock(pCacheObj);

  if (pCacheObj->numOfElemsInTrash == 0) {
    if (pCacheObj->pTrash != NULL) {
      uError("key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->numOfElemsInTrash);
    }
    pCacheObj->pTrash = NULL;

    __cache_unlock(pCacheObj);
    return;
H
hjxilinx 已提交
586
  }
587 588 589 590 591 592 593 594 595 596

  STrashElem *pElem = pCacheObj->pTrash;

  while (pElem) {
    T_REF_VAL_CHECK(pElem->pData);
    if (pElem->next == pElem) {
      pElem->next = NULL;
    }

    if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
597
      uDebug("key:%p, %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData->data,
598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
             pCacheObj->numOfElemsInTrash - 1);
      STrashElem *p = pElem;

      pElem = pElem->next;
      taosRemoveFromTrashCan(pCacheObj, p);
    } else {
      pElem = pElem->next;
    }
  }

  __cache_unlock(pCacheObj);
}

void doCleanupDataCache(SCacheObj *pCacheObj) {
  __cache_wr_lock(pCacheObj);
613 614 615 616

  SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
  while (taosHashIterNext(pIter)) {
    SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
H
Haojun Liao 已提交
617 618 619

    int32_t c = T_REF_VAL_GET(pNode);
    if (c <= 0) {
620 621
      taosCacheReleaseNode(pCacheObj, pNode);
    } else {
H
Haojun Liao 已提交
622 623
      uDebug("cache:%s key:%p, %p will not remove from cache, refcnt:%d", pCacheObj->name, pNode->key,
          pNode->data, T_REF_VAL_GET(pNode));
624
    }
625 626 627
  }
  taosHashDestroyIter(pIter);

H
Haojun Liao 已提交
628 629
  // todo memory leak if there are object with refcount greater than 0 in hash table?
  taosHashCleanup(pCacheObj->pHashTable);
630 631 632 633
  __cache_unlock(pCacheObj);

  taosTrashCanEmpty(pCacheObj, true);
  __cache_lock_destroy(pCacheObj);
H
Haojun Liao 已提交
634 635
  
  tfree(pCacheObj->name);
636 637
  memset(pCacheObj, 0, sizeof(SCacheObj));
  free(pCacheObj);
638
}
639

640 641 642 643 644 645
static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t fp) {
  SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);

  __cache_wr_lock(pCacheObj);
  while (taosHashIterNext(pIter)) {
    SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
H
Haojun Liao 已提交
646

647
    if (pNode->expireTime < time && T_REF_VAL_GET(pNode) <= 0) {
648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663
      taosCacheReleaseNode(pCacheObj, pNode);
      continue;
    }

    if (fp) {
      fp(pNode->data);
    }
  }

  __cache_unlock(pCacheObj);

  taosHashDestroyIter(pIter);
}

void* taosCacheTimedRefresh(void *handle) {
  SCacheObj* pCacheObj = handle;
664
  if (pCacheObj == NULL) {
665
    uDebug("object is destroyed. no refresh retry");
666 667 668 669 670 671 672 673 674 675 676 677
    return NULL;
  }

  const int32_t SLEEP_DURATION = 500; //500 ms
  int64_t totalTick = pCacheObj->refreshTime / SLEEP_DURATION;

  int64_t count = 0;
  while(1) {
    taosMsleep(500);

    // check if current cache object will be deleted every 500ms.
    if (pCacheObj->deleting) {
678
      uDebug("%s refresh threads quit", pCacheObj->name);
679 680 681 682 683 684 685 686 687
      break;
    }

    if (++count < totalTick) {
      continue;
    }

    // reset the count value
    count = 0;
H
Haojun Liao 已提交
688 689
    size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable);
    if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
690 691 692 693 694
      continue;
    }

    pCacheObj->statistics.refreshCount++;

H
Haojun Liao 已提交
695 696
    // refresh data in hash table
    if (elemInHash > 0) {
697 698
      int64_t now = taosGetTimestampMs();
      doCacheRefresh(pCacheObj, now, NULL);
H
Haojun Liao 已提交
699
    }
700 701 702 703 704

    taosTrashCanEmpty(pCacheObj, false);
  }

  return NULL;
dengyihao's avatar
dengyihao 已提交
705
}
706 707 708 709 710 711 712 713 714

void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp) {
  if (pCacheObj == NULL) {
    return;
  }

  int64_t now = taosGetTimestampMs();
  doCacheRefresh(pCacheObj, now, fp);
}