tcache.c 24.8 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16 17
#define _DEFAULT_SOURCE
#include "os.h"
S
log  
Shengliang Guan 已提交
18
#include "tlog.h"
H
hzcheng 已提交
19 20
#include "ttimer.h"
#include "tutil.h"
S
slguan 已提交
21
#include "tcache.h"
H
hzcheng 已提交
22

23
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) {
24
#if defined(LINUX)
25
  pthread_rwlock_wrlock(&pCacheObj->lock);
26
#else
27
  pthread_mutex_lock(&pCacheObj->lock);
28 29 30
#endif
}

31
static FORCE_INLINE void __cache_unlock(SCacheObj *pCacheObj) {
32
#if defined(LINUX)
33
  pthread_rwlock_unlock(&pCacheObj->lock);
34
#else
35
  pthread_mutex_unlock(&pCacheObj->lock);
36 37 38
#endif
}

39
static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pCacheObj) {
40
#if defined(LINUX)
41
  return pthread_rwlock_init(&pCacheObj->lock, NULL);
42
#else
43
  return pthread_mutex_init(&pCacheObj->lock, NULL);
44 45 46
#endif
}

47
static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
48
#if defined(LINUX)
49
  pthread_rwlock_destroy(&pCacheObj->lock);
50
#else
51
  pthread_mutex_destroy(&pCacheObj->lock);
52 53 54
#endif
}

55 56 57 58 59 60 61 62 63 64 65 66
/**
 * do cleanup the taos cache
 * @param pCacheObj
 */
static void doCleanupDataCache(SCacheObj *pCacheObj);

/**
 * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
 * @param handle   Cache object handle
 */
static void* taosCacheTimedRefresh(void *handle);

H
Haojun Liao 已提交
67
static pthread_t cacheRefreshWorker   = {0};
68 69 70
static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t guard          = PTHREAD_MUTEX_INITIALIZER;
static SArray* pCacheArrayList        = NULL;
71
static bool    stopRefreshWorker      = false;
72 73
static bool    refreshWorkerNormalStopped       = false;
static bool    refreshWorkerUnexpectedStopped   = false;
74

H
Haojun Liao 已提交
75
static void doInitRefreshThread(void) {
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
  pCacheArrayList = taosArrayInit(4, POINTER_BYTES);

  pthread_attr_t thattr;
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

  pthread_create(&cacheRefreshWorker, &thattr, taosCacheTimedRefresh, NULL);
  pthread_attr_destroy(&thattr);
}

pthread_t doRegisterCacheObj(SCacheObj* pCacheObj) {
  pthread_once(&cacheThreadInit, doInitRefreshThread);

  pthread_mutex_lock(&guard);
  taosArrayPush(pCacheArrayList, &pCacheObj);
  pthread_mutex_unlock(&guard);

  return cacheRefreshWorker;
}

H
hzcheng 已提交
96 97 98 99 100 101
/**
 * @param key      key of object for hash, usually a null-terminated string
 * @param keyLen   length of key
 * @param pData    actually data. required a consecutive memory block, no pointer is allowed
 *                 in pData. Pointer copy causes memory access error.
 * @param size     size of block
102 103
 * @param lifespan total survial expiredTime from now
 * @return         SCacheDataNode
H
hzcheng 已提交
104
 */
105
static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration);
H
hzcheng 已提交
106 107

/**
108
 * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
H
hzcheng 已提交
109
 * It will be removed until the pNode->refCount == 0
110
 * @param pCacheObj    Cache object
H
hzcheng 已提交
111 112
 * @param pNode   Cache slot object
 */
H
Haojun Liao 已提交
113
static void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode);
114

H
hzcheng 已提交
115 116 117
/**
 * remove nodes in trash with refCount == 0 in cache
 * @param pNode
118
 * @param pCacheObj
H
hzcheng 已提交
119 120 121
 * @param force   force model, if true, remove data in trash without check refcount.
 *                may cause corruption. So, forece model only applys before cache is closed
 */
H
Haojun Liao 已提交
122
static void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force);
H
hzcheng 已提交
123 124 125

/**
 * release node
126
 * @param pCacheObj      cache object
H
Haojun Liao 已提交
127
 * @param pNode          data node
H
hzcheng 已提交
128
 */
129
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
H
hzcheng 已提交
130
  if (pNode->signature != (uint64_t)pNode) {
S
slguan 已提交
131
    uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
H
hzcheng 已提交
132 133
    return;
  }
134

H
Haojun Liao 已提交
135
  atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size);
S
Shengliang Guan 已提交
136
  int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable);
H
Haojun Liao 已提交
137 138
  assert(size > 0);

H
Haojun Liao 已提交
139
  uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, total num:%d size:%" PRId64 "bytes",
H
Haojun Liao 已提交
140
         pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize);
H
Haojun Liao 已提交
141 142 143 144

  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pNode->data);
  }
145

H
hzcheng 已提交
146 147 148
  free(pNode);
}

H
Haojun Liao 已提交
149
static FORCE_INLINE STrashElem* doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem *pElem) {
H
Haojun Liao 已提交
150
  if (pElem->pData->signature != (uint64_t) pElem->pData) {
H
Haojun Liao 已提交
151
    uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
H
Haojun Liao 已提交
152
    return NULL;
H
Haojun Liao 已提交
153 154
  }

H
Haojun Liao 已提交
155 156
  STrashElem* next = pElem->next;

H
Haojun Liao 已提交
157 158 159 160 161 162 163
  pCacheObj->numOfElemsInTrash--;
  if (pElem->prev) {
    pElem->prev->next = pElem->next;
  } else { // pnode is the header, update header
    pCacheObj->pTrash = pElem->next;
  }

H
Haojun Liao 已提交
164 165
  if (next) {
    next->prev = pElem->prev;
H
Haojun Liao 已提交
166
  }
H
Haojun Liao 已提交
167 168 169 170 171 172

  if (pCacheObj->numOfElemsInTrash == 0) {
    assert(pCacheObj->pTrash == NULL);
  }

  return next;
H
Haojun Liao 已提交
173 174 175 176 177 178 179 180 181 182
}

static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem *pElem) {
  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pElem->pData->data);
  }

  free(pElem->pData);
  free(pElem);
}
H
hzcheng 已提交
183

184
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char* cacheName) {
185 186
  const int32_t SLEEP_DURATION = 500; //500 ms

H
Haojun Liao 已提交
187
  if (refreshTimeInSeconds <= 0) {
H
hzcheng 已提交
188 189
    return NULL;
  }
190
  
191 192
  SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
  if (pCacheObj == NULL) {
S
slguan 已提交
193
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
194 195
    return NULL;
  }
196
  
H
Haojun Liao 已提交
197
  pCacheObj->pHashTable = taosHashInit(4096, taosGetDefaultHashFunction(keyType), false, HASH_ENTRY_LOCK);
H
Haojun Liao 已提交
198
  pCacheObj->name = strdup(cacheName);
199 200
  if (pCacheObj->pHashTable == NULL) {
    free(pCacheObj);
S
slguan 已提交
201
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
202 203
    return NULL;
  }
204
  
H
Haojun Liao 已提交
205
  // set free cache node callback function
206
  pCacheObj->freeFp      = fn;
H
Haojun Liao 已提交
207
  pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
208 209
  pCacheObj->checkTick   = pCacheObj->refreshTime / SLEEP_DURATION;
  pCacheObj->extendLifespan = extendLifespan;  // the TTL after the last access
210

211 212 213
  if (__cache_lock_init(pCacheObj) != 0) {
    taosHashCleanup(pCacheObj->pHashTable);
    free(pCacheObj);
214
    
S
slguan 已提交
215
    uError("failed to init lock, reason:%s", strerror(errno));
H
hzcheng 已提交
216 217
    return NULL;
  }
218

219
  doRegisterCacheObj(pCacheObj);
220
  return pCacheObj;
221
}
H
hzcheng 已提交
222

223
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int durationMS) {
224
  if (pCacheObj == NULL || pCacheObj->pHashTable == NULL || pCacheObj->deleting == 1) {
225 226
    return NULL;
  }
H
Haojun Liao 已提交
227

228
  SCacheDataNode *pNode1 = taosCreateCacheNode(key, keyLen, pData, dataSize, durationMS);
H
Haojun Liao 已提交
229 230 231 232 233
  if (pNode1 == NULL) {
    uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
    return NULL;
  }

H
Haojun Liao 已提交
234
  T_REF_INC(pNode1);
H
Haojun Liao 已提交
235

H
Haojun Liao 已提交
236 237
  int32_t succ = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
  if (succ == 0) {
H
Haojun Liao 已提交
238
    atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size);
H
Haojun Liao 已提交
239
    uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
H
Haojun Liao 已提交
240
           ", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes",
H
Haojun Liao 已提交
241 242
           pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime,
           (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
H
Haojun Liao 已提交
243 244
  } else {  // duplicated key exists
    while (1) {
H
Haojun Liao 已提交
245
      SCacheDataNode* p = NULL;
H
Haojun Liao 已提交
246 247
//      int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, key, keyLen, (void*) &p, sizeof(void*));
      int32_t ret = taosHashRemove(pCacheObj->pHashTable, key, keyLen);
H
Haojun Liao 已提交
248 249

      // add to trashcan
H
Haojun Liao 已提交
250 251 252 253 254 255
      if (ret == 0) {
        if (T_REF_VAL_GET(p) == 0) {
          if (pCacheObj->freeFp) {
            pCacheObj->freeFp(p->data);
          }

H
Haojun Liao 已提交
256
          atomic_sub_fetch_64(&pCacheObj->totalSize, p->size);
S
TD-1848  
Shengliang Guan 已提交
257
          tfree(p);
H
Haojun Liao 已提交
258
        } else {
H
Haojun Liao 已提交
259
          taosAddToTrashcan(pCacheObj, p);
H
Haojun Liao 已提交
260
          uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, p->data);
H
Haojun Liao 已提交
261 262
        }
      }
H
Haojun Liao 已提交
263

H
Haojun Liao 已提交
264
      assert(T_REF_VAL_GET(pNode1) == 1);
265

H
Haojun Liao 已提交
266
      ret = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
H
Haojun Liao 已提交
267 268
      if (ret == 0) {
        atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size);
269

H
Haojun Liao 已提交
270
        uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
H
Haojun Liao 已提交
271 272 273
               ", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes",
               pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime,
               (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
H
Haojun Liao 已提交
274

H
Haojun Liao 已提交
275
        return pNode1->data;
H
Haojun Liao 已提交
276

H
Haojun Liao 已提交
277 278 279 280
      } else {
        // failed, try again
      }
    }
H
Haojun Liao 已提交
281 282 283
  }

  return pNode1->data;
H
hzcheng 已提交
284 285
}

H
Haojun Liao 已提交
286 287 288 289
static void incRefFn(void* ptNode) {
  assert(ptNode != NULL);

  SCacheDataNode** p = (SCacheDataNode**) ptNode;
H
Haojun Liao 已提交
290
  assert(T_REF_VAL_GET(*p) >= 0);
H
Haojun Liao 已提交
291

H
Haojun Liao 已提交
292 293 294 295
  int32_t ret = T_REF_INC(*p);
  assert(ret > 0);
}

H
Haojun Liao 已提交
296
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
297 298 299 300 301 302
  if (pCacheObj == NULL || pCacheObj->deleting == 1) {
    return NULL;
  }

  if (taosHashGetSize(pCacheObj->pHashTable) == 0) {
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
303
    return NULL;
304
  }
H
Haojun Liao 已提交
305

H
Haojun Liao 已提交
306
  SCacheDataNode* ptNode = NULL;
H
Haojun Liao 已提交
307 308
  taosHashGetClone(pCacheObj->pHashTable, key, keyLen, &ptNode);
//  taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode);
S
Shengliang Guan 已提交
309

H
Haojun Liao 已提交
310
  void* pData = (ptNode != NULL)? ptNode->data:NULL;
S
Shengliang Guan 已提交
311 312

  if (pData != NULL) {
313
    atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
H
Haojun Liao 已提交
314
    uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, T_REF_VAL_GET(ptNode));
315
  } else {
316
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
H
Haojun Liao 已提交
317
    uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
318
  }
S
Shengliang Guan 已提交
319

320
  atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
S
Shengliang Guan 已提交
321
  return pData;
322 323
}

324 325
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
  if (pCacheObj == NULL || data == NULL) return NULL;
326
  
327 328
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
329
  
330
  if (ptNode->signature != (uint64_t)ptNode) {
H
Haojun Liao 已提交
331
    uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
332 333
    return NULL;
  }
334

335
  int32_t ref = T_REF_INC(ptNode);
H
Haojun Liao 已提交
336
  uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
H
Haojun Liao 已提交
337

338 339 340 341 342
  // the data if referenced by at least one object, so the reference count must be greater than the value of 2.
  assert(ref >= 2);
  return data;
}

343
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
H
Haojun Liao 已提交
344
  if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
345
  
346 347
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
348
  
349
  if (ptNode->signature != (uint64_t)ptNode) {
H
Haojun Liao 已提交
350
    uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
351 352 353
    return NULL;
  }
  
354
  assert(T_REF_VAL_GET(ptNode) >= 1);
355
  
356
  char *d = *data;
357 358 359 360 361
  
  // clear its reference to old area
  *data = NULL;
  
  return d;
H
hzcheng 已提交
362
}
363

364
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
H
Haojun Liao 已提交
365
  if (pCacheObj == NULL) {
H
Haojun Liao 已提交
366 367 368
    return;
  }

H
Haojun Liao 已提交
369
  if ((*data) == NULL) {
H
Haojun Liao 已提交
370
    uError("cache:%s, NULL data to release", pCacheObj->name);
371 372
    return;
  }
H
Haojun Liao 已提交
373 374 375 376 377


  // The operation of removal from hash table and addition to trashcan is not an atomic operation,
  // therefore the check for the empty of both the hash table and the trashcan has a race condition.
  // It happens when there is only one object in the cache, and two threads which has referenced this object
H
Haojun Liao 已提交
378
  // start to free the it simultaneously [TD-1569].
379 380 381 382
  size_t offset = offsetof(SCacheDataNode, data);
  
  SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
  if (pNode->signature != (uint64_t)pNode) {
H
Haojun Liao 已提交
383
    uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode);
384 385
    return;
  }
386

387
  *data = NULL;
388

389
  // note: extend lifespan before dec ref count
H
Haojun Liao 已提交
390
  bool inTrashcan = pNode->inTrashcan;
H
Haojun Liao 已提交
391

H
Haojun Liao 已提交
392
  if (pCacheObj->extendLifespan && (!inTrashcan) && (!_remove)) {
393
    atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
394
    uDebug("cache:%s, data:%p extend expire time: %"PRId64, pCacheObj->name, pNode->data, pNode->expireTime);
395
  }
H
Haojun Liao 已提交
396

H
Haojun Liao 已提交
397 398
  if (_remove) {
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
H
Haojun Liao 已提交
399 400 401
    char* key = pNode->key;
    char* d = pNode->data;

402
    int32_t ref = T_REF_VAL_GET(pNode);
H
Haojun Liao 已提交
403
    uDebug("cache:%s, key:%p, %p is released, refcnt:%d, in trashcan:%d", pCacheObj->name, key, d, ref - 1, inTrashcan);
H
Haojun Liao 已提交
404 405 406 407 408 409 410 411

    /*
     * If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all users
     * releasing this resources.
     *
     * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
     * that tries to do the same thing.
     */
H
Haojun Liao 已提交
412
    if (inTrashcan) {
H
Haojun Liao 已提交
413
      ref = T_REF_VAL_GET(pNode);
414

H
Haojun Liao 已提交
415 416 417
      if (ref == 1) {
        // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may be
        // destroyed by refresh worker if decrease ref count before removing it from linked-list.
H
Haojun Liao 已提交
418
        assert(pNode->pTNodeHeader->pData == pNode);
H
Haojun Liao 已提交
419

H
Haojun Liao 已提交
420 421 422 423
        __cache_wr_lock(pCacheObj);
        doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
        __cache_unlock(pCacheObj);

H
Haojun Liao 已提交
424 425 426
        ref = T_REF_DEC(pNode);
        assert(ref == 0);

H
Haojun Liao 已提交
427
        doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
H
Haojun Liao 已提交
428 429
      } else {
        ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
430
        assert(ref >= 0);
H
Haojun Liao 已提交
431 432
      }
    } else {
433 434
      // NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
      // when reaches here.
H
Haojun Liao 已提交
435
      SCacheDataNode *p = NULL;
H
Haojun Liao 已提交
436 437
      int32_t ret = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
//      int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, pNode->key, pNode->keySize, &p, sizeof(void *));
438
      ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
439 440 441 442

      // successfully remove from hash table, if failed, this node must have been move to trash already, do nothing.
      // note that the remove operation can be executed only once.
      if (ret == 0) {
H
Haojun Liao 已提交
443
        if (p != pNode) {
H
Haojun Liao 已提交
444
          uDebug( "cache:%s, key:%p, successfully removed a new entry:%p, refcnt:%d, prev entry:%p has been removed by "
H
Haojun Liao 已提交
445 446 447
              "others already", pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data);

          assert(p->pTNodeHeader == NULL);
H
Haojun Liao 已提交
448
          taosAddToTrashcan(pCacheObj, p);
H
Haojun Liao 已提交
449
        } else {
H
Haojun Liao 已提交
450
          uDebug("cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d", pCacheObj->name, pNode->key,
H
Haojun Liao 已提交
451 452 453
                 pNode->data, ref);
          if (ref > 0) {
            assert(pNode->pTNodeHeader == NULL);
454

H
Haojun Liao 已提交
455
            taosAddToTrashcan(pCacheObj, pNode);
H
Haojun Liao 已提交
456 457
          } else {  // ref == 0
            atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size);
H
Haojun Liao 已提交
458

H
Haojun Liao 已提交
459
            int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable);
H
Haojun Liao 已提交
460
            uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes",
H
Haojun Liao 已提交
461
                   pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->totalSize);
H
Haojun Liao 已提交
462

H
Haojun Liao 已提交
463 464 465
            if (pCacheObj->freeFp) {
              pCacheObj->freeFp(pNode->data);
            }
H
Haojun Liao 已提交
466

H
Haojun Liao 已提交
467
            free(pNode);
H
Haojun Liao 已提交
468
          }
H
Haojun Liao 已提交
469
        }
470
      } else {
H
Haojun Liao 已提交
471
        uDebug("cache:%s, key:%p, %p has been removed from hash table by others already, refcnt:%d",
H
Haojun Liao 已提交
472
               pCacheObj->name, pNode->key, pNode->data, ref);
H
Haojun Liao 已提交
473
      }
H
Haojun Liao 已提交
474 475
    }

476
  } else {
H
Haojun Liao 已提交
477
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
H
Haojun Liao 已提交
478 479 480
    char* key = pNode->key;
    char* p = pNode->data;

H
Haojun Liao 已提交
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500
//    int32_t ref = T_REF_VAL_GET(pNode);
//
//    if (ref == 1 && inTrashcan) {
//      // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may be
//      // destroyed by refresh worker if decrease ref count before removing it from linked-list.
//      assert(pNode->pTNodeHeader->pData == pNode);
//
//      __cache_wr_lock(pCacheObj);
//      doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
//      __cache_unlock(pCacheObj);
//
//      ref = T_REF_DEC(pNode);
//      assert(ref == 0);
//
//      doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
//    } else {
//      ref = T_REF_DEC(pNode);
//      assert(ref >= 0);
//    }

H
Haojun Liao 已提交
501
    int32_t ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
502
    uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trashcan:%d", pCacheObj->name, key, p, ref, inTrashcan);
H
Haojun Liao 已提交
503 504
  }
}
H
Haojun Liao 已提交
505

H
Haojun Liao 已提交
506 507 508
typedef struct SHashTravSupp {
  SCacheObj* pCacheObj;
  int64_t    time;
A
AlexDuan 已提交
509
  __cache_trav_fn_t fp;
A
AlexDuan 已提交
510
  void* param1;
H
Haojun Liao 已提交
511 512 513 514 515 516 517 518 519 520 521
} SHashTravSupp;

static bool travHashTableEmptyFn(void* param, void* data) {
  SHashTravSupp* ps = (SHashTravSupp*) param;
  SCacheObj* pCacheObj= ps->pCacheObj;

  SCacheDataNode *pNode = *(SCacheDataNode **) data;

  if (T_REF_VAL_GET(pNode) == 0) {
    taosCacheReleaseNode(pCacheObj, pNode);
  } else { // do add to trashcan
H
Haojun Liao 已提交
522
    taosAddToTrashcan(pCacheObj, pNode);
523
  }
H
Haojun Liao 已提交
524 525 526

  // this node should be remove from hash table
  return false;
527 528
}

529
void taosCacheEmpty(SCacheObj *pCacheObj) {
H
Haojun Liao 已提交
530 531
  SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};

H
Haojun Liao 已提交
532
//  taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
H
Haojun Liao 已提交
533
  taosTrashcanEmpty(pCacheObj, false);
534 535 536 537 538 539
}

void taosCacheCleanup(SCacheObj *pCacheObj) {
  if (pCacheObj == NULL) {
    return;
  }
540 541

  pCacheObj->deleting = 1;
542 543

  // wait for the refresh thread quit before destroying the cache object.
544 545 546 547
  // But in the dll, the child thread will be killed before atexit takes effect.
  while(atomic_load_8(&pCacheObj->deleting) != 0) {
    if (refreshWorkerNormalStopped) break;    
    if (refreshWorkerUnexpectedStopped) return;    
548
    taosMsleep(50);
S
TD-2805  
Shengliang Guan 已提交
549
  }
550

H
Haojun Liao 已提交
551
  uInfo("cache:%s will be cleaned up", pCacheObj->name);
552 553 554
  doCleanupDataCache(pCacheObj);
}

H
Haojun Liao 已提交
555
SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration) {
556
  size_t totalSize = size + sizeof(SCacheDataNode) + keyLen;
557 558 559 560 561 562 563 564 565 566

  SCacheDataNode *pNewNode = calloc(1, totalSize);
  if (pNewNode == NULL) {
    uError("failed to allocate memory, reason:%s", strerror(errno));
    return NULL;
  }

  memcpy(pNewNode->data, pData, size);

  pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size;
S
Shengliang Guan 已提交
567
  pNewNode->keySize = (uint16_t)keyLen;
568 569 570

  memcpy(pNewNode->key, key, keyLen);

H
Haojun Liao 已提交
571 572
  pNewNode->addedTime    = (uint64_t)taosGetTimestampMs();
  pNewNode->lifespan     = duration;
573
  pNewNode->expireTime   = pNewNode->addedTime + pNewNode->lifespan;
H
Haojun Liao 已提交
574 575
  pNewNode->signature    = (uint64_t)pNewNode;
  pNewNode->size         = (uint32_t)totalSize;
576 577 578 579

  return pNewNode;
}

H
Haojun Liao 已提交
580
void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
H
Haojun Liao 已提交
581
  if (pNode->inTrashcan) { /* node is already in trash */
582
    assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
583 584 585
    return;
  }

586
  __cache_wr_lock(pCacheObj);
587 588
  STrashElem *pElem = calloc(1, sizeof(STrashElem));
  pElem->pData = pNode;
589 590
  pElem->prev = NULL;
  pElem->next = NULL;
H
Haojun Liao 已提交
591
  pNode->inTrashcan = true;
592
  pNode->pTNodeHeader = pElem;
H
Haojun Liao 已提交
593

594 595 596 597 598 599 600
  pElem->next = pCacheObj->pTrash;
  if (pCacheObj->pTrash) {
    pCacheObj->pTrash->prev = pElem;
  }

  pCacheObj->pTrash = pElem;
  pCacheObj->numOfElemsInTrash++;
H
Haojun Liao 已提交
601
  __cache_unlock(pCacheObj);
602

603 604
  uDebug("cache:%s key:%p, %p move to trashcan, pTrashElem:%p, numOfElem in trashcan:%d", pCacheObj->name, pNode->key,
         pNode->data, pElem, pCacheObj->numOfElemsInTrash);
605 606
}

H
Haojun Liao 已提交
607
void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
608 609 610 611
  __cache_wr_lock(pCacheObj);

  if (pCacheObj->numOfElemsInTrash == 0) {
    if (pCacheObj->pTrash != NULL) {
H
Haojun Liao 已提交
612
      pCacheObj->pTrash = NULL;
H
Haojun Liao 已提交
613
      uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name, pCacheObj->numOfElemsInTrash);
614 615 616 617
    }

    __cache_unlock(pCacheObj);
    return;
H
hjxilinx 已提交
618
  }
619

H
Haojun Liao 已提交
620 621 622
  const char* stat[] = {"false", "true"};
  uDebug("cache:%s start to cleanup trashcan, numOfElem in trashcan:%d, free:%s", pCacheObj->name,
      pCacheObj->numOfElemsInTrash, (force? stat[1]:stat[0]));
623

H
Haojun Liao 已提交
624
  STrashElem *pElem = pCacheObj->pTrash;
625 626
  while (pElem) {
    T_REF_VAL_CHECK(pElem->pData);
H
Haojun Liao 已提交
627
    assert(pElem->next != pElem && pElem->prev != pElem);
628 629

    if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
H
Haojun Liao 已提交
630
      uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key, pElem->pData->data,
631 632
             pCacheObj->numOfElemsInTrash - 1);

H
Haojun Liao 已提交
633 634 635
      doRemoveElemInTrashcan(pCacheObj, pElem);
      doDestroyTrashcanElem(pCacheObj, pElem);
      pElem = pCacheObj->pTrash;
636 637 638 639 640 641 642 643 644
    } else {
      pElem = pElem->next;
    }
  }

  __cache_unlock(pCacheObj);
}

void doCleanupDataCache(SCacheObj *pCacheObj) {
H
Haojun Liao 已提交
645 646
//  SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
//  taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
647

H
Haojun Liao 已提交
648 649
  // todo memory leak if there are object with refcount greater than 0 in hash table?
  taosHashCleanup(pCacheObj->pHashTable);
650
  taosTrashcanEmpty(pCacheObj, true);
H
Haojun Liao 已提交
651

652
  __cache_lock_destroy(pCacheObj);
H
Haojun Liao 已提交
653
  
S
TD-1848  
Shengliang Guan 已提交
654
  tfree(pCacheObj->name);
655 656
  memset(pCacheObj, 0, sizeof(SCacheObj));
  free(pCacheObj);
657
}
658

H
Haojun Liao 已提交
659 660 661
bool travHashTableFn(void* param, void* data) {
  SHashTravSupp* ps = (SHashTravSupp*) param;
  SCacheObj*     pCacheObj= ps->pCacheObj;
662

H
Haojun Liao 已提交
663
  SCacheDataNode* pNode = *(SCacheDataNode **) data;
S
Shengliang Guan 已提交
664
  if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) {
H
Haojun Liao 已提交
665
    taosCacheReleaseNode(pCacheObj, pNode);
H
Haojun Liao 已提交
666

H
Haojun Liao 已提交
667 668 669
    // this node should be remove from hash table
    return false;
  }
670

H
Haojun Liao 已提交
671
  if (ps->fp) {
A
AlexDuan 已提交
672
    (ps->fp)(pNode->data, ps->param1);
673 674
  }

H
Haojun Liao 已提交
675 676 677 678
  // do not remove element in hash table
  return true;
}

A
AlexDuan 已提交
679
static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_trav_fn_t fp, void* param1) {
H
Haojun Liao 已提交
680
  assert(pCacheObj != NULL);
681

A
AlexDuan 已提交
682
  SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1};
H
Haojun Liao 已提交
683
//  taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup);
684 685
}

686 687 688 689 690 691
void taosCacheRefreshWorkerUnexpectedStopped(void) {
  if(!refreshWorkerNormalStopped) {
    refreshWorkerUnexpectedStopped=true;
  }
}

692
void* taosCacheTimedRefresh(void *handle) {
693 694
  assert(pCacheArrayList != NULL);
  uDebug("cache refresh thread starts");
695

H
Haojun Liao 已提交
696
  setThreadName("cacheRefresh");
697

698 699
  const int32_t SLEEP_DURATION = 500; //500 ms
  int64_t count = 0;
700
  atexit(taosCacheRefreshWorkerUnexpectedStopped);
701

702
  while(1) {
703
    taosMsleep(SLEEP_DURATION);
704 705 706
    if (stopRefreshWorker) {
      goto _end;
    }
H
Haojun Liao 已提交
707

708 709 710
    pthread_mutex_lock(&guard);
    size_t size = taosArrayGetSize(pCacheArrayList);
    pthread_mutex_unlock(&guard);
711

712
    count += 1;
713

714 715 716
    for(int32_t i = 0; i < size; ++i) {
      pthread_mutex_lock(&guard);
      SCacheObj* pCacheObj = taosArrayGetP(pCacheArrayList, i);
717

718
      if (pCacheObj == NULL) {
719 720 721
        uError("object is destroyed. ignore and try next");
        pthread_mutex_unlock(&guard);
        continue;
722
      }
723

724 725
      // check if current cache object will be deleted every 500ms.
      if (pCacheObj->deleting) {
726 727 728
        taosArrayRemove(pCacheArrayList, i);
        size = taosArrayGetSize(pCacheArrayList);

H
Haojun Liao 已提交
729
        uDebug("%s is destroying, remove it from refresh list, remain cache obj:%"PRIzu, pCacheObj->name, size);
730
        pCacheObj->deleting = 0;  //reset the deleting flag to enable pCacheObj to continue releasing resources.
731

732 733
        pthread_mutex_unlock(&guard);
        continue;
734
      }
735

736 737
      pthread_mutex_unlock(&guard);

738 739 740 741 742 743 744 745 746 747 748 749 750 751 752
      if ((count % pCacheObj->checkTick) != 0) {
        continue;
      }

      size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable);
      if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
        continue;
      }

      uDebug("%s refresh thread scan", pCacheObj->name);
      pCacheObj->statistics.refreshCount++;

      // refresh data in hash table
      if (elemInHash > 0) {
        int64_t now = taosGetTimestampMs();
A
AlexDuan 已提交
753
        doCacheRefresh(pCacheObj, now, NULL, NULL);
754 755 756 757
      }

      taosTrashcanEmpty(pCacheObj, false);
    }
758 759
  }

760 761
  _end:
  taosArrayDestroy(pCacheArrayList);
H
Haojun Liao 已提交
762 763 764

  pCacheArrayList = NULL;
  pthread_mutex_destroy(&guard);
765
  refreshWorkerNormalStopped=true;
H
Haojun Liao 已提交
766

767
  uDebug("cache refresh thread quits");
768
  return NULL;
dengyihao's avatar
dengyihao 已提交
769
}
770

A
AlexDuan 已提交
771
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1) {
772 773 774 775 776
  if (pCacheObj == NULL) {
    return;
  }

  int64_t now = taosGetTimestampMs();
A
AlexDuan 已提交
777
  doCacheRefresh(pCacheObj, now, fp, param1);
778
}
779

780 781
void taosStopCacheRefreshWorker(void) {
  stopRefreshWorker = true;
782
}