tcache.c 22.2 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16 17 18
#define _DEFAULT_SOURCE
#include "os.h"
#include "tulog.h"
H
hzcheng 已提交
19 20
#include "ttimer.h"
#include "tutil.h"
S
slguan 已提交
21 22 23
#include "tcache.h"
#include "hash.h"
#include "hashfunc.h"
H
hzcheng 已提交
24

25
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) {
26
#if defined(LINUX)
27
  pthread_rwlock_wrlock(&pCacheObj->lock);
28
#else
29
  pthread_mutex_lock(&pCacheObj->lock);
30 31 32
#endif
}

33
static FORCE_INLINE void __cache_unlock(SCacheObj *pCacheObj) {
34
#if defined(LINUX)
35
  pthread_rwlock_unlock(&pCacheObj->lock);
36
#else
37
  pthread_mutex_unlock(&pCacheObj->lock);
38 39 40
#endif
}

41
static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pCacheObj) {
42
#if defined(LINUX)
43
  return pthread_rwlock_init(&pCacheObj->lock, NULL);
44
#else
45
  return pthread_mutex_init(&pCacheObj->lock, NULL);
46 47 48
#endif
}

49
static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
50
#if defined(LINUX)
51
  pthread_rwlock_destroy(&pCacheObj->lock);
52
#else
53
  pthread_mutex_destroy(&pCacheObj->lock);
54 55 56
#endif
}

H
hzcheng 已提交
57 58 59 60 61 62
/**
 * @param key      key of object for hash, usually a null-terminated string
 * @param keyLen   length of key
 * @param pData    actually data. required a consecutive memory block, no pointer is allowed
 *                 in pData. Pointer copy causes memory access error.
 * @param size     size of block
63 64
 * @param lifespan total survial expiredTime from now
 * @return         SCacheDataNode
H
hzcheng 已提交
65
 */
66
static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration);
H
hzcheng 已提交
67 68

/**
69
 * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
H
hzcheng 已提交
70
 * It will be removed until the pNode->refCount == 0
71
 * @param pCacheObj    Cache object
H
hzcheng 已提交
72 73
 * @param pNode   Cache slot object
 */
H
Haojun Liao 已提交
74
static void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode);
75

H
hzcheng 已提交
76 77 78
/**
 * remove nodes in trash with refCount == 0 in cache
 * @param pNode
79
 * @param pCacheObj
H
hzcheng 已提交
80 81 82
 * @param force   force model, if true, remove data in trash without check refcount.
 *                may cause corruption. So, forece model only applys before cache is closed
 */
H
Haojun Liao 已提交
83
static void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force);
H
hzcheng 已提交
84 85 86

/**
 * release node
87
 * @param pCacheObj      cache object
H
Haojun Liao 已提交
88
 * @param pNode          data node
H
hzcheng 已提交
89
 */
90
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
H
hzcheng 已提交
91
  if (pNode->signature != (uint64_t)pNode) {
S
slguan 已提交
92
    uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
H
hzcheng 已提交
93 94
    return;
  }
95

96
  pCacheObj->totalSize -= pNode->size;
S
Shengliang Guan 已提交
97
  int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable);
H
Haojun Liao 已提交
98 99
  assert(size > 0);

100
  uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes",
H
Haojun Liao 已提交
101
         pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize);
H
Haojun Liao 已提交
102 103 104 105

  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pNode->data);
  }
106

H
hzcheng 已提交
107 108 109
  free(pNode);
}

H
Haojun Liao 已提交
110
static FORCE_INLINE STrashElem* doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem *pElem) {
H
Haojun Liao 已提交
111
  if (pElem->pData->signature != (uint64_t) pElem->pData) {
H
Haojun Liao 已提交
112
    uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
H
Haojun Liao 已提交
113
    return NULL;
H
Haojun Liao 已提交
114 115
  }

H
Haojun Liao 已提交
116 117
  STrashElem* next = pElem->next;

H
Haojun Liao 已提交
118 119 120 121 122 123 124
  pCacheObj->numOfElemsInTrash--;
  if (pElem->prev) {
    pElem->prev->next = pElem->next;
  } else { // pnode is the header, update header
    pCacheObj->pTrash = pElem->next;
  }

H
Haojun Liao 已提交
125 126
  if (next) {
    next->prev = pElem->prev;
H
Haojun Liao 已提交
127
  }
H
Haojun Liao 已提交
128 129 130 131 132 133

  if (pCacheObj->numOfElemsInTrash == 0) {
    assert(pCacheObj->pTrash == NULL);
  }

  return next;
H
Haojun Liao 已提交
134 135 136 137 138 139 140 141 142 143
}

static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem *pElem) {
  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pElem->pData->data);
  }

  free(pElem->pData);
  free(pElem);
}
H
hzcheng 已提交
144

145 146 147 148 149
/**
 * do cleanup the taos cache
 * @param pCacheObj
 */
static void doCleanupDataCache(SCacheObj *pCacheObj);
150

H
hzcheng 已提交
151
/**
152
 * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
H
hzcheng 已提交
153 154
 * @param handle   Cache object handle
 */
H
Haojun Liao 已提交
155
static void* taosCacheTimedRefresh(void *handle);
H
hjxilinx 已提交
156

157
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char* cacheName) {
H
Haojun Liao 已提交
158
  if (refreshTimeInSeconds <= 0) {
H
hzcheng 已提交
159 160
    return NULL;
  }
161
  
162 163
  SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
  if (pCacheObj == NULL) {
S
slguan 已提交
164
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
165 166
    return NULL;
  }
167
  
H
Haojun Liao 已提交
168
  pCacheObj->pHashTable = taosHashInit(4096, taosGetDefaultHashFunction(keyType), false, HASH_ENTRY_LOCK);
H
Haojun Liao 已提交
169
  pCacheObj->name = strdup(cacheName);
170 171
  if (pCacheObj->pHashTable == NULL) {
    free(pCacheObj);
S
slguan 已提交
172
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
173 174
    return NULL;
  }
175
  
H
Haojun Liao 已提交
176
  // set free cache node callback function
H
Haojun Liao 已提交
177 178 179
  pCacheObj->freeFp = fn;
  pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
  pCacheObj->extendLifespan = extendLifespan;
180

181 182 183
  if (__cache_lock_init(pCacheObj) != 0) {
    taosHashCleanup(pCacheObj->pHashTable);
    free(pCacheObj);
184
    
S
slguan 已提交
185
    uError("failed to init lock, reason:%s", strerror(errno));
H
hzcheng 已提交
186 187
    return NULL;
  }
188

S
Shengliang Guan 已提交
189
  pthread_attr_t thattr;
190 191 192
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

193
  pthread_create(&pCacheObj->refreshWorker, &thattr, taosCacheTimedRefresh, pCacheObj);
194 195

  pthread_attr_destroy(&thattr);
196
  return pCacheObj;
197
}
H
hzcheng 已提交
198

199
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int durationMS) {
200
  if (pCacheObj == NULL || pCacheObj->pHashTable == NULL || pCacheObj->deleting == 1) {
201 202
    return NULL;
  }
H
Haojun Liao 已提交
203

204
  SCacheDataNode *pNode1 = taosCreateCacheNode(key, keyLen, pData, dataSize, durationMS);
H
Haojun Liao 已提交
205 206 207 208 209
  if (pNode1 == NULL) {
    uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
    return NULL;
  }

H
Haojun Liao 已提交
210
  T_REF_INC(pNode1);
H
Haojun Liao 已提交
211

H
Haojun Liao 已提交
212 213
  int32_t succ = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
  if (succ == 0) {
H
Haojun Liao 已提交
214
    atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size);
H
Haojun Liao 已提交
215
    uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
H
Haojun Liao 已提交
216
           ", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes",
H
Haojun Liao 已提交
217 218
           pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime,
           (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
H
Haojun Liao 已提交
219 220
  } else {  // duplicated key exists
    while (1) {
H
Haojun Liao 已提交
221
      SCacheDataNode* p = NULL;
H
Haojun Liao 已提交
222
      int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, key, keyLen, (void*) &p, sizeof(void*));
H
Haojun Liao 已提交
223 224

      // add to trashcan
H
Haojun Liao 已提交
225 226 227 228 229 230
      if (ret == 0) {
        if (T_REF_VAL_GET(p) == 0) {
          if (pCacheObj->freeFp) {
            pCacheObj->freeFp(p->data);
          }

S
TD-1848  
Shengliang Guan 已提交
231
          tfree(p);
H
Haojun Liao 已提交
232
        } else {
H
Haojun Liao 已提交
233
          taosAddToTrashcan(pCacheObj, p);
H
Haojun Liao 已提交
234
          uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, p->data);
H
Haojun Liao 已提交
235 236
        }
      }
H
Haojun Liao 已提交
237

H
Haojun Liao 已提交
238
      assert(T_REF_VAL_GET(pNode1) == 1);
239

H
Haojun Liao 已提交
240
      ret = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
H
Haojun Liao 已提交
241 242
      if (ret == 0) {
        atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size);
243

H
Haojun Liao 已提交
244
        uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
H
Haojun Liao 已提交
245 246 247
               ", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes",
               pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime,
               (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
H
Haojun Liao 已提交
248

H
Haojun Liao 已提交
249
        return pNode1->data;
H
Haojun Liao 已提交
250

H
Haojun Liao 已提交
251 252 253 254
      } else {
        // failed, try again
      }
    }
H
Haojun Liao 已提交
255 256 257
  }

  return pNode1->data;
H
hzcheng 已提交
258 259
}

H
Haojun Liao 已提交
260 261 262 263
static void incRefFn(void* ptNode) {
  assert(ptNode != NULL);

  SCacheDataNode** p = (SCacheDataNode**) ptNode;
H
Haojun Liao 已提交
264
  assert(T_REF_VAL_GET(*p) >= 0);
H
Haojun Liao 已提交
265

H
Haojun Liao 已提交
266 267 268 269
  int32_t ret = T_REF_INC(*p);
  assert(ret > 0);
}

H
Haojun Liao 已提交
270
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
271 272 273 274 275 276
  if (pCacheObj == NULL || pCacheObj->deleting == 1) {
    return NULL;
  }

  if (taosHashGetSize(pCacheObj->pHashTable) == 0) {
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
277
    return NULL;
278
  }
H
Haojun Liao 已提交
279

H
Haojun Liao 已提交
280
  SCacheDataNode* ptNode = NULL;
281
  taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode, sizeof(void*));
S
Shengliang Guan 已提交
282

H
Haojun Liao 已提交
283
  void* pData = (ptNode != NULL)? ptNode->data:NULL;
S
Shengliang Guan 已提交
284 285

  if (pData != NULL) {
286
    atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
H
Haojun Liao 已提交
287
    uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, T_REF_VAL_GET(ptNode));
288
  } else {
289
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
H
Haojun Liao 已提交
290
    uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
291
  }
S
Shengliang Guan 已提交
292

293
  atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
S
Shengliang Guan 已提交
294
  return pData;
295 296
}

297 298
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
  if (pCacheObj == NULL || data == NULL) return NULL;
299
  
300 301
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
302
  
303
  if (ptNode->signature != (uint64_t)ptNode) {
H
Haojun Liao 已提交
304
    uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
305 306
    return NULL;
  }
307

308
  int32_t ref = T_REF_INC(ptNode);
H
Haojun Liao 已提交
309
  uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
H
Haojun Liao 已提交
310

311 312 313 314 315
  // the data if referenced by at least one object, so the reference count must be greater than the value of 2.
  assert(ref >= 2);
  return data;
}

316
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
H
Haojun Liao 已提交
317
  if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
318
  
319 320
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
321
  
322
  if (ptNode->signature != (uint64_t)ptNode) {
H
Haojun Liao 已提交
323
    uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
324 325 326
    return NULL;
  }
  
327
  assert(T_REF_VAL_GET(ptNode) >= 1);
328
  
329
  char *d = *data;
330 331 332 333 334
  
  // clear its reference to old area
  *data = NULL;
  
  return d;
H
hzcheng 已提交
335
}
336

337
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
H
Haojun Liao 已提交
338
  if (pCacheObj == NULL) {
H
Haojun Liao 已提交
339 340 341
    return;
  }

H
Haojun Liao 已提交
342
  if ((*data) == NULL) {
H
Haojun Liao 已提交
343
    uError("cache:%s, NULL data to release", pCacheObj->name);
344 345
    return;
  }
H
Haojun Liao 已提交
346 347 348 349 350


  // The operation of removal from hash table and addition to trashcan is not an atomic operation,
  // therefore the check for the empty of both the hash table and the trashcan has a race condition.
  // It happens when there is only one object in the cache, and two threads which has referenced this object
H
Haojun Liao 已提交
351
  // start to free the it simultaneously [TD-1569].
352 353 354 355
  size_t offset = offsetof(SCacheDataNode, data);
  
  SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
  if (pNode->signature != (uint64_t)pNode) {
H
Haojun Liao 已提交
356
    uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode);
357 358
    return;
  }
359

360
  *data = NULL;
361

362
  // note: extend lifespan before dec ref count
H
Haojun Liao 已提交
363
  bool inTrashcan = pNode->inTrashcan;
H
Haojun Liao 已提交
364

H
Haojun Liao 已提交
365
  if (pCacheObj->extendLifespan && (!inTrashcan) && (!_remove)) {
366
    atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
H
Haojun Liao 已提交
367
    uDebug("cache:%s data:%p extend expire time: %"PRId64, pCacheObj->name, pNode->data, pNode->expireTime);
368
  }
H
Haojun Liao 已提交
369

H
Haojun Liao 已提交
370 371
  if (_remove) {
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
H
Haojun Liao 已提交
372 373 374
    char* key = pNode->key;
    char* d = pNode->data;

375
    int32_t ref = T_REF_VAL_GET(pNode);
H
Haojun Liao 已提交
376
    uDebug("cache:%s, key:%p, %p is released, refcnt:%d, in trashcan:%d", pCacheObj->name, key, d, ref - 1, inTrashcan);
H
Haojun Liao 已提交
377 378 379 380 381 382 383 384

    /*
     * If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all users
     * releasing this resources.
     *
     * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
     * that tries to do the same thing.
     */
H
Haojun Liao 已提交
385
    if (inTrashcan) {
H
Haojun Liao 已提交
386
      ref = T_REF_VAL_GET(pNode);
387

H
Haojun Liao 已提交
388 389 390
      if (ref == 1) {
        // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may be
        // destroyed by refresh worker if decrease ref count before removing it from linked-list.
H
Haojun Liao 已提交
391
        assert(pNode->pTNodeHeader->pData == pNode);
H
Haojun Liao 已提交
392

H
Haojun Liao 已提交
393 394 395 396
        __cache_wr_lock(pCacheObj);
        doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
        __cache_unlock(pCacheObj);

H
Haojun Liao 已提交
397 398 399
        ref = T_REF_DEC(pNode);
        assert(ref == 0);

H
Haojun Liao 已提交
400
        doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
H
Haojun Liao 已提交
401 402
      } else {
        ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
403
        assert(ref >= 0);
H
Haojun Liao 已提交
404 405
      }
    } else {
406 407
      // NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
      // when reaches here.
H
Haojun Liao 已提交
408 409
      SCacheDataNode *p = NULL;
      int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, pNode->key, pNode->keySize, &p, sizeof(void *));
410
      ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
411 412 413 414

      // successfully remove from hash table, if failed, this node must have been move to trash already, do nothing.
      // note that the remove operation can be executed only once.
      if (ret == 0) {
H
Haojun Liao 已提交
415
        if (p != pNode) {
H
Haojun Liao 已提交
416
          uDebug( "cache:%s, key:%p, successfully removed a new entry:%p, refcnt:%d, prev entry:%p has been removed by "
H
Haojun Liao 已提交
417 418 419
              "others already", pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data);

          assert(p->pTNodeHeader == NULL);
H
Haojun Liao 已提交
420
          taosAddToTrashcan(pCacheObj, p);
H
Haojun Liao 已提交
421
        } else {
H
Haojun Liao 已提交
422
          uDebug("cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d", pCacheObj->name, pNode->key,
H
Haojun Liao 已提交
423 424 425
                 pNode->data, ref);
          if (ref > 0) {
            assert(pNode->pTNodeHeader == NULL);
426

H
Haojun Liao 已提交
427
            taosAddToTrashcan(pCacheObj, pNode);
H
Haojun Liao 已提交
428 429
          } else {  // ref == 0
            atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size);
H
Haojun Liao 已提交
430

H
Haojun Liao 已提交
431
            int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable);
H
Haojun Liao 已提交
432
            uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes",
H
Haojun Liao 已提交
433
                   pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->totalSize);
H
Haojun Liao 已提交
434

H
Haojun Liao 已提交
435 436 437
            if (pCacheObj->freeFp) {
              pCacheObj->freeFp(pNode->data);
            }
H
Haojun Liao 已提交
438

H
Haojun Liao 已提交
439
            free(pNode);
H
Haojun Liao 已提交
440
          }
H
Haojun Liao 已提交
441
        }
442
      } else {
H
Haojun Liao 已提交
443
        uDebug("cache:%s, key:%p, %p has been removed from hash table by others already, refcnt:%d",
H
Haojun Liao 已提交
444
               pCacheObj->name, pNode->key, pNode->data, ref);
H
Haojun Liao 已提交
445
      }
H
Haojun Liao 已提交
446 447
    }

448
  } else {
H
Haojun Liao 已提交
449
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
H
Haojun Liao 已提交
450 451 452
    char* key = pNode->key;
    char* p = pNode->data;

H
Haojun Liao 已提交
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
//    int32_t ref = T_REF_VAL_GET(pNode);
//
//    if (ref == 1 && inTrashcan) {
//      // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may be
//      // destroyed by refresh worker if decrease ref count before removing it from linked-list.
//      assert(pNode->pTNodeHeader->pData == pNode);
//
//      __cache_wr_lock(pCacheObj);
//      doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
//      __cache_unlock(pCacheObj);
//
//      ref = T_REF_DEC(pNode);
//      assert(ref == 0);
//
//      doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
//    } else {
//      ref = T_REF_DEC(pNode);
//      assert(ref >= 0);
//    }

H
Haojun Liao 已提交
473
    int32_t ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
474
    uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trashcan:%d", pCacheObj->name, key, p, ref, inTrashcan);
H
Haojun Liao 已提交
475 476
  }
}
H
Haojun Liao 已提交
477

H
Haojun Liao 已提交
478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
typedef struct SHashTravSupp {
  SCacheObj* pCacheObj;
  int64_t    time;
  __cache_free_fn_t fp;
} SHashTravSupp;

static bool travHashTableEmptyFn(void* param, void* data) {
  SHashTravSupp* ps = (SHashTravSupp*) param;
  SCacheObj* pCacheObj= ps->pCacheObj;

  SCacheDataNode *pNode = *(SCacheDataNode **) data;

  if (T_REF_VAL_GET(pNode) == 0) {
    taosCacheReleaseNode(pCacheObj, pNode);
  } else { // do add to trashcan
H
Haojun Liao 已提交
493
    taosAddToTrashcan(pCacheObj, pNode);
494
  }
H
Haojun Liao 已提交
495 496 497

  // this node should be remove from hash table
  return false;
498 499
}

500
void taosCacheEmpty(SCacheObj *pCacheObj) {
H
Haojun Liao 已提交
501 502 503
  SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};

  taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
H
Haojun Liao 已提交
504
  taosTrashcanEmpty(pCacheObj, false);
505 506 507 508 509 510
}

void taosCacheCleanup(SCacheObj *pCacheObj) {
  if (pCacheObj == NULL) {
    return;
  }
511 512

  pCacheObj->deleting = 1;
S
TD-2805  
Shengliang Guan 已提交
513 514 515
  if (taosCheckPthreadValid(pCacheObj->refreshWorker)) {
    pthread_join(pCacheObj->refreshWorker, NULL);
  }
516

H
Haojun Liao 已提交
517
  uInfo("cache:%s will be cleaned up", pCacheObj->name);
518 519 520
  doCleanupDataCache(pCacheObj);
}

H
Haojun Liao 已提交
521
SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration) {
522
  size_t totalSize = size + sizeof(SCacheDataNode) + keyLen;
523 524 525 526 527 528 529 530 531 532

  SCacheDataNode *pNewNode = calloc(1, totalSize);
  if (pNewNode == NULL) {
    uError("failed to allocate memory, reason:%s", strerror(errno));
    return NULL;
  }

  memcpy(pNewNode->data, pData, size);

  pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size;
S
Shengliang Guan 已提交
533
  pNewNode->keySize = (uint16_t)keyLen;
534 535 536

  memcpy(pNewNode->key, key, keyLen);

H
Haojun Liao 已提交
537 538
  pNewNode->addedTime    = (uint64_t)taosGetTimestampMs();
  pNewNode->lifespan     = duration;
539
  pNewNode->expireTime   = pNewNode->addedTime + pNewNode->lifespan;
H
Haojun Liao 已提交
540 541
  pNewNode->signature    = (uint64_t)pNewNode;
  pNewNode->size         = (uint32_t)totalSize;
542 543 544 545

  return pNewNode;
}

H
Haojun Liao 已提交
546
void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
H
Haojun Liao 已提交
547
  if (pNode->inTrashcan) { /* node is already in trash */
548
    assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
549 550 551
    return;
  }

552
  __cache_wr_lock(pCacheObj);
553 554
  STrashElem *pElem = calloc(1, sizeof(STrashElem));
  pElem->pData = pNode;
555 556
  pElem->prev = NULL;
  pElem->next = NULL;
H
Haojun Liao 已提交
557
  pNode->inTrashcan = true;
558
  pNode->pTNodeHeader = pElem;
H
Haojun Liao 已提交
559

560 561 562 563 564 565 566
  pElem->next = pCacheObj->pTrash;
  if (pCacheObj->pTrash) {
    pCacheObj->pTrash->prev = pElem;
  }

  pCacheObj->pTrash = pElem;
  pCacheObj->numOfElemsInTrash++;
H
Haojun Liao 已提交
567
  __cache_unlock(pCacheObj);
568

569 570
  uDebug("cache:%s key:%p, %p move to trashcan, pTrashElem:%p, numOfElem in trashcan:%d", pCacheObj->name, pNode->key,
         pNode->data, pElem, pCacheObj->numOfElemsInTrash);
571 572
}

H
Haojun Liao 已提交
573
void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
574 575 576 577
  __cache_wr_lock(pCacheObj);

  if (pCacheObj->numOfElemsInTrash == 0) {
    if (pCacheObj->pTrash != NULL) {
H
Haojun Liao 已提交
578
      pCacheObj->pTrash = NULL;
H
Haojun Liao 已提交
579
      uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name, pCacheObj->numOfElemsInTrash);
580 581 582 583
    }

    __cache_unlock(pCacheObj);
    return;
H
hjxilinx 已提交
584
  }
585

H
Haojun Liao 已提交
586 587 588
  const char* stat[] = {"false", "true"};
  uDebug("cache:%s start to cleanup trashcan, numOfElem in trashcan:%d, free:%s", pCacheObj->name,
      pCacheObj->numOfElemsInTrash, (force? stat[1]:stat[0]));
589

H
Haojun Liao 已提交
590
  STrashElem *pElem = pCacheObj->pTrash;
591 592
  while (pElem) {
    T_REF_VAL_CHECK(pElem->pData);
H
Haojun Liao 已提交
593
    assert(pElem->next != pElem && pElem->prev != pElem);
594 595

    if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
H
Haojun Liao 已提交
596
      uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key, pElem->pData->data,
597 598
             pCacheObj->numOfElemsInTrash - 1);

H
Haojun Liao 已提交
599 600 601
      doRemoveElemInTrashcan(pCacheObj, pElem);
      doDestroyTrashcanElem(pCacheObj, pElem);
      pElem = pCacheObj->pTrash;
602 603 604 605 606 607 608 609 610
    } else {
      pElem = pElem->next;
    }
  }

  __cache_unlock(pCacheObj);
}

void doCleanupDataCache(SCacheObj *pCacheObj) {
H
Haojun Liao 已提交
611 612
  SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
  taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
613

H
Haojun Liao 已提交
614 615
  // todo memory leak if there are object with refcount greater than 0 in hash table?
  taosHashCleanup(pCacheObj->pHashTable);
Y
TD-4241  
yihaoDeng 已提交
616
  taosTrashcanEmpty(pCacheObj, false);
H
Haojun Liao 已提交
617

618
  __cache_lock_destroy(pCacheObj);
H
Haojun Liao 已提交
619
  
S
TD-1848  
Shengliang Guan 已提交
620
  tfree(pCacheObj->name);
621 622
  memset(pCacheObj, 0, sizeof(SCacheObj));
  free(pCacheObj);
623
}
624

H
Haojun Liao 已提交
625 626 627
bool travHashTableFn(void* param, void* data) {
  SHashTravSupp* ps = (SHashTravSupp*) param;
  SCacheObj*     pCacheObj= ps->pCacheObj;
628

H
Haojun Liao 已提交
629
  SCacheDataNode* pNode = *(SCacheDataNode **) data;
S
Shengliang Guan 已提交
630
  if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) {
H
Haojun Liao 已提交
631
    taosCacheReleaseNode(pCacheObj, pNode);
H
Haojun Liao 已提交
632

H
Haojun Liao 已提交
633 634 635
    // this node should be remove from hash table
    return false;
  }
636

H
Haojun Liao 已提交
637 638
  if (ps->fp) {
    (ps->fp)(pNode->data);
639 640
  }

H
Haojun Liao 已提交
641 642 643 644 645 646
  // do not remove element in hash table
  return true;
}

static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t fp) {
  assert(pCacheObj != NULL);
647

H
Haojun Liao 已提交
648 649
  SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time};
  taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup);
650 651 652 653
}

void* taosCacheTimedRefresh(void *handle) {
  SCacheObj* pCacheObj = handle;
654
  if (pCacheObj == NULL) {
H
Haojun Liao 已提交
655
    uDebug("object is destroyed. no refresh retry");
656 657 658 659 660 661 662 663
    return NULL;
  }

  const int32_t SLEEP_DURATION = 500; //500 ms
  int64_t totalTick = pCacheObj->refreshTime / SLEEP_DURATION;

  int64_t count = 0;
  while(1) {
H
Haojun Liao 已提交
664 665
    taosMsleep(500);

666 667
    // check if current cache object will be deleted every 500ms.
    if (pCacheObj->deleting) {
H
Haojun Liao 已提交
668
      uDebug("%s refresh threads quit", pCacheObj->name);
669 670 671 672 673 674 675 676 677
      break;
    }

    if (++count < totalTick) {
      continue;
    }

    // reset the count value
    count = 0;
H
Haojun Liao 已提交
678 679
    size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable);
    if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
680 681 682
      continue;
    }

683
    uDebug("%s refresh thread timed scan", pCacheObj->name);
684 685
    pCacheObj->statistics.refreshCount++;

H
Haojun Liao 已提交
686 687
    // refresh data in hash table
    if (elemInHash > 0) {
688 689
      int64_t now = taosGetTimestampMs();
      doCacheRefresh(pCacheObj, now, NULL);
H
Haojun Liao 已提交
690
    }
691

H
Haojun Liao 已提交
692
    taosTrashcanEmpty(pCacheObj, false);
693 694 695
  }

  return NULL;
dengyihao's avatar
dengyihao 已提交
696
}
697 698 699 700 701 702 703 704 705

void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp) {
  if (pCacheObj == NULL) {
    return;
  }

  int64_t now = taosGetTimestampMs();
  doCacheRefresh(pCacheObj, now, fp);
}