tcache.c 25.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
S
cache  
Shengliang Guan 已提交
17
#include "tcache.h"
S
log  
Shengliang Guan 已提交
18
#include "tlog.h"
H
hzcheng 已提交
19 20 21
#include "ttimer.h"
#include "tutil.h"

22
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) {
23
#if defined(LINUX)
24
  pthread_rwlock_wrlock(&pCacheObj->lock);
25
#else
26
  pthread_mutex_lock(&pCacheObj->lock);
27 28 29
#endif
}

30
static FORCE_INLINE void __cache_unlock(SCacheObj *pCacheObj) {
31
#if defined(LINUX)
32
  pthread_rwlock_unlock(&pCacheObj->lock);
33
#else
34
  pthread_mutex_unlock(&pCacheObj->lock);
35 36 37
#endif
}

38
static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pCacheObj) {
39
#if defined(LINUX)
40
  return pthread_rwlock_init(&pCacheObj->lock, NULL);
41
#else
42
  return pthread_mutex_init(&pCacheObj->lock, NULL);
43 44 45
#endif
}

46
static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
47
#if defined(LINUX)
48
  pthread_rwlock_destroy(&pCacheObj->lock);
49
#else
50
  pthread_mutex_destroy(&pCacheObj->lock);
51 52 53
#endif
}

54 55 56 57 58 59 60 61 62 63
/**
 * do cleanup the taos cache
 * @param pCacheObj
 */
static void doCleanupDataCache(SCacheObj *pCacheObj);

/**
 * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
 * @param handle   Cache object handle
 */
S
cache  
Shengliang Guan 已提交
64
static void *taosCacheTimedRefresh(void *handle);
65

S
cache  
Shengliang Guan 已提交
66 67 68 69 70 71 72
static pthread_t       cacheRefreshWorker = {0};
static pthread_once_t  cacheThreadInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER;
static SArray         *pCacheArrayList = NULL;
static bool            stopRefreshWorker = false;
static bool            refreshWorkerNormalStopped = false;
static bool            refreshWorkerUnexpectedStopped = false;
73

H
Haojun Liao 已提交
74
static void doInitRefreshThread(void) {
75 76 77 78 79 80 81 82 83 84
  pCacheArrayList = taosArrayInit(4, POINTER_BYTES);

  pthread_attr_t thattr;
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

  pthread_create(&cacheRefreshWorker, &thattr, taosCacheTimedRefresh, NULL);
  pthread_attr_destroy(&thattr);
}

S
cache  
Shengliang Guan 已提交
85
pthread_t doRegisterCacheObj(SCacheObj *pCacheObj) {
86 87 88 89 90 91 92 93 94
  pthread_once(&cacheThreadInit, doInitRefreshThread);

  pthread_mutex_lock(&guard);
  taosArrayPush(pCacheArrayList, &pCacheObj);
  pthread_mutex_unlock(&guard);

  return cacheRefreshWorker;
}

H
hzcheng 已提交
95 96 97 98 99 100
/**
 * @param key      key of object for hash, usually a null-terminated string
 * @param keyLen   length of key
 * @param pData    actually data. required a consecutive memory block, no pointer is allowed
 *                 in pData. Pointer copy causes memory access error.
 * @param size     size of block
101 102
 * @param lifespan total survial expiredTime from now
 * @return         SCacheDataNode
H
hzcheng 已提交
103
 */
S
cache  
Shengliang Guan 已提交
104 105
static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
                                           uint64_t duration);
H
hzcheng 已提交
106 107

/**
108
 * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
H
hzcheng 已提交
109
 * It will be removed until the pNode->refCount == 0
110
 * @param pCacheObj    Cache object
H
hzcheng 已提交
111 112
 * @param pNode   Cache slot object
 */
H
Haojun Liao 已提交
113
static void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode);
114

H
hzcheng 已提交
115 116 117
/**
 * remove nodes in trash with refCount == 0 in cache
 * @param pNode
118
 * @param pCacheObj
H
hzcheng 已提交
119 120 121
 * @param force   force model, if true, remove data in trash without check refcount.
 *                may cause corruption. So, forece model only applys before cache is closed
 */
H
Haojun Liao 已提交
122
static void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force);
H
hzcheng 已提交
123 124 125

/**
 * release node
126
 * @param pCacheObj      cache object
H
Haojun Liao 已提交
127
 * @param pNode          data node
H
hzcheng 已提交
128
 */
129
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
H
hzcheng 已提交
130
  if (pNode->signature != (uint64_t)pNode) {
S
slguan 已提交
131
    uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
H
hzcheng 已提交
132 133
    return;
  }
134

H
Haojun Liao 已提交
135
  atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size);
S
Shengliang Guan 已提交
136
  int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable);
H
Haojun Liao 已提交
137 138
  assert(size > 0);

H
Haojun Liao 已提交
139
  uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, total num:%d size:%" PRId64 "bytes",
H
Haojun Liao 已提交
140
         pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize);
H
Haojun Liao 已提交
141 142 143 144

  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pNode->data);
  }
145

H
hzcheng 已提交
146 147 148
  free(pNode);
}

S
cache  
Shengliang Guan 已提交
149 150
static FORCE_INLINE STrashElem *doRemoveElemInTrashcan(SCacheObj *pCacheObj, STrashElem *pElem) {
  if (pElem->pData->signature != (uint64_t)pElem->pData) {
H
Haojun Liao 已提交
151
    uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
H
Haojun Liao 已提交
152
    return NULL;
H
Haojun Liao 已提交
153 154
  }

S
cache  
Shengliang Guan 已提交
155
  STrashElem *next = pElem->next;
H
Haojun Liao 已提交
156

H
Haojun Liao 已提交
157 158 159
  pCacheObj->numOfElemsInTrash--;
  if (pElem->prev) {
    pElem->prev->next = pElem->next;
S
cache  
Shengliang Guan 已提交
160
  } else {  // pnode is the header, update header
H
Haojun Liao 已提交
161 162 163
    pCacheObj->pTrash = pElem->next;
  }

H
Haojun Liao 已提交
164 165
  if (next) {
    next->prev = pElem->prev;
H
Haojun Liao 已提交
166
  }
H
Haojun Liao 已提交
167 168 169 170 171 172

  if (pCacheObj->numOfElemsInTrash == 0) {
    assert(pCacheObj->pTrash == NULL);
  }

  return next;
H
Haojun Liao 已提交
173 174
}

S
cache  
Shengliang Guan 已提交
175
static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj *pCacheObj, STrashElem *pElem) {
H
Haojun Liao 已提交
176 177 178 179 180 181 182
  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pElem->pData->data);
  }

  free(pElem->pData);
  free(pElem);
}
H
hzcheng 已提交
183

S
cache  
Shengliang Guan 已提交
184 185 186
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn,
                         const char *cacheName) {
  const int32_t SLEEP_DURATION = 500;  // 500 ms
187

H
Haojun Liao 已提交
188
  if (refreshTimeInSeconds <= 0) {
H
hzcheng 已提交
189 190
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
191

192 193
  SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
  if (pCacheObj == NULL) {
S
slguan 已提交
194
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
195 196
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
197

H
Haojun Liao 已提交
198
  pCacheObj->pHashTable = taosHashInit(4096, taosGetDefaultHashFunction(keyType), false, HASH_ENTRY_LOCK);
H
Haojun Liao 已提交
199
  pCacheObj->name = strdup(cacheName);
200 201
  if (pCacheObj->pHashTable == NULL) {
    free(pCacheObj);
S
slguan 已提交
202
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
203 204
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
205

H
Haojun Liao 已提交
206
  // set free cache node callback function
S
cache  
Shengliang Guan 已提交
207
  pCacheObj->freeFp = fn;
H
Haojun Liao 已提交
208
  pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
S
cache  
Shengliang Guan 已提交
209
  pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION;
210
  pCacheObj->extendLifespan = extendLifespan;  // the TTL after the last access
211

212 213 214
  if (__cache_lock_init(pCacheObj) != 0) {
    taosHashCleanup(pCacheObj->pHashTable);
    free(pCacheObj);
S
cache  
Shengliang Guan 已提交
215

S
slguan 已提交
216
    uError("failed to init lock, reason:%s", strerror(errno));
H
hzcheng 已提交
217 218
    return NULL;
  }
219

220
  doRegisterCacheObj(pCacheObj);
221
  return pCacheObj;
222
}
H
hzcheng 已提交
223

S
cache  
Shengliang Guan 已提交
224 225
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize,
                   int32_t durationMS) {
226
  if (pCacheObj == NULL || pCacheObj->pHashTable == NULL || pCacheObj->deleting == 1) {
227 228
    return NULL;
  }
H
Haojun Liao 已提交
229

230
  SCacheDataNode *pNode1 = taosCreateCacheNode(key, keyLen, pData, dataSize, durationMS);
H
Haojun Liao 已提交
231 232 233 234 235
  if (pNode1 == NULL) {
    uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
    return NULL;
  }

H
Haojun Liao 已提交
236
  T_REF_INC(pNode1);
H
Haojun Liao 已提交
237

H
Haojun Liao 已提交
238 239
  int32_t succ = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
  if (succ == 0) {
H
Haojun Liao 已提交
240
    atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size);
H
Haojun Liao 已提交
241
    uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
H
Haojun Liao 已提交
242
           ", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes",
H
Haojun Liao 已提交
243 244
           pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime,
           (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
H
Haojun Liao 已提交
245 246
  } else {  // duplicated key exists
    while (1) {
S
cache  
Shengliang Guan 已提交
247 248
      SCacheDataNode *p = NULL;
      //      int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, key, keyLen, (void*) &p, sizeof(void*));
H
Haojun Liao 已提交
249
      int32_t ret = taosHashRemove(pCacheObj->pHashTable, key, keyLen);
H
Haojun Liao 已提交
250 251

      // add to trashcan
H
Haojun Liao 已提交
252 253 254 255 256 257
      if (ret == 0) {
        if (T_REF_VAL_GET(p) == 0) {
          if (pCacheObj->freeFp) {
            pCacheObj->freeFp(p->data);
          }

H
Haojun Liao 已提交
258
          atomic_sub_fetch_64(&pCacheObj->totalSize, p->size);
S
TD-1848  
Shengliang Guan 已提交
259
          tfree(p);
H
Haojun Liao 已提交
260
        } else {
H
Haojun Liao 已提交
261
          taosAddToTrashcan(pCacheObj, p);
H
Haojun Liao 已提交
262
          uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, p->data);
H
Haojun Liao 已提交
263 264
        }
      }
H
Haojun Liao 已提交
265

H
Haojun Liao 已提交
266
      assert(T_REF_VAL_GET(pNode1) == 1);
267

H
Haojun Liao 已提交
268
      ret = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
H
Haojun Liao 已提交
269 270
      if (ret == 0) {
        atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size);
271

H
Haojun Liao 已提交
272
        uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
H
Haojun Liao 已提交
273 274 275
               ", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes",
               pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime,
               (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
H
Haojun Liao 已提交
276

H
Haojun Liao 已提交
277
        return pNode1->data;
H
Haojun Liao 已提交
278

H
Haojun Liao 已提交
279 280 281 282
      } else {
        // failed, try again
      }
    }
H
Haojun Liao 已提交
283 284 285
  }

  return pNode1->data;
H
hzcheng 已提交
286 287
}

S
cache  
Shengliang Guan 已提交
288
static void incRefFn(void *ptNode) {
H
Haojun Liao 已提交
289 290
  assert(ptNode != NULL);

S
cache  
Shengliang Guan 已提交
291
  SCacheDataNode **p = (SCacheDataNode **)ptNode;
H
Haojun Liao 已提交
292
  assert(T_REF_VAL_GET(*p) >= 0);
H
Haojun Liao 已提交
293

H
Haojun Liao 已提交
294 295 296 297
  int32_t ret = T_REF_INC(*p);
  assert(ret > 0);
}

H
Haojun Liao 已提交
298
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
299 300 301 302 303 304
  if (pCacheObj == NULL || pCacheObj->deleting == 1) {
    return NULL;
  }

  if (taosHashGetSize(pCacheObj->pHashTable) == 0) {
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
305
    return NULL;
306
  }
H
Haojun Liao 已提交
307

H
Haojun Liao 已提交
308
  // TODO remove it
S
cache  
Shengliang Guan 已提交
309
  SCacheDataNode *ptNode = NULL;
H
Haojun Liao 已提交
310
  ptNode = taosHashAcquire(pCacheObj->pHashTable, key, keyLen);
S
cache  
Shengliang Guan 已提交
311
  //  taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode);
S
Shengliang Guan 已提交
312

S
cache  
Shengliang Guan 已提交
313
  void *pData = (ptNode != NULL) ? ptNode->data : NULL;
S
Shengliang Guan 已提交
314 315

  if (pData != NULL) {
316
    atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
S
cache  
Shengliang Guan 已提交
317 318
    uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData,
           T_REF_VAL_GET(ptNode));
319
  } else {
320
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
H
Haojun Liao 已提交
321
    uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
322
  }
S
Shengliang Guan 已提交
323

324
  atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
S
Shengliang Guan 已提交
325
  return pData;
326 327
}

328 329
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
  if (pCacheObj == NULL || data == NULL) return NULL;
S
cache  
Shengliang Guan 已提交
330

331 332
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
S
cache  
Shengliang Guan 已提交
333

334
  if (ptNode->signature != (uint64_t)ptNode) {
H
Haojun Liao 已提交
335
    uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
336 337
    return NULL;
  }
338

339
  int32_t ref = T_REF_INC(ptNode);
H
Haojun Liao 已提交
340
  uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
H
Haojun Liao 已提交
341

342 343 344 345 346
  // the data if referenced by at least one object, so the reference count must be greater than the value of 2.
  assert(ref >= 2);
  return data;
}

347
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
H
Haojun Liao 已提交
348
  if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
S
cache  
Shengliang Guan 已提交
349

350 351
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
S
cache  
Shengliang Guan 已提交
352

353
  if (ptNode->signature != (uint64_t)ptNode) {
H
Haojun Liao 已提交
354
    uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
355 356
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
357

358
  assert(T_REF_VAL_GET(ptNode) >= 1);
S
cache  
Shengliang Guan 已提交
359

360
  char *d = *data;
S
cache  
Shengliang Guan 已提交
361

362 363
  // clear its reference to old area
  *data = NULL;
S
cache  
Shengliang Guan 已提交
364

365
  return d;
H
hzcheng 已提交
366
}
367

368
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
H
Haojun Liao 已提交
369
  if (pCacheObj == NULL) {
H
Haojun Liao 已提交
370 371 372
    return;
  }

H
Haojun Liao 已提交
373
  if ((*data) == NULL) {
H
Haojun Liao 已提交
374
    uError("cache:%s, NULL data to release", pCacheObj->name);
375 376
    return;
  }
H
Haojun Liao 已提交
377 378 379 380

  // The operation of removal from hash table and addition to trashcan is not an atomic operation,
  // therefore the check for the empty of both the hash table and the trashcan has a race condition.
  // It happens when there is only one object in the cache, and two threads which has referenced this object
H
Haojun Liao 已提交
381
  // start to free the it simultaneously [TD-1569].
382
  size_t offset = offsetof(SCacheDataNode, data);
S
cache  
Shengliang Guan 已提交
383

384 385
  SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
  if (pNode->signature != (uint64_t)pNode) {
H
Haojun Liao 已提交
386
    uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode);
387 388
    return;
  }
389

390
  *data = NULL;
391

392
  // note: extend lifespan before dec ref count
H
Haojun Liao 已提交
393
  bool inTrashcan = pNode->inTrashcan;
H
Haojun Liao 已提交
394

H
Haojun Liao 已提交
395
  if (pCacheObj->extendLifespan && (!inTrashcan) && (!_remove)) {
396
    atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
S
cache  
Shengliang Guan 已提交
397
    uDebug("cache:%s, data:%p extend expire time: %" PRId64, pCacheObj->name, pNode->data, pNode->expireTime);
398
  }
H
Haojun Liao 已提交
399

H
Haojun Liao 已提交
400 401
  if (_remove) {
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
S
cache  
Shengliang Guan 已提交
402 403
    char *key = pNode->key;
    char *d = pNode->data;
H
Haojun Liao 已提交
404

405
    int32_t ref = T_REF_VAL_GET(pNode);
H
Haojun Liao 已提交
406
    uDebug("cache:%s, key:%p, %p is released, refcnt:%d, in trashcan:%d", pCacheObj->name, key, d, ref - 1, inTrashcan);
H
Haojun Liao 已提交
407 408

    /*
S
cache  
Shengliang Guan 已提交
409 410
     * If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all
     * users releasing this resources.
H
Haojun Liao 已提交
411 412 413 414
     *
     * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
     * that tries to do the same thing.
     */
H
Haojun Liao 已提交
415
    if (inTrashcan) {
H
Haojun Liao 已提交
416
      ref = T_REF_VAL_GET(pNode);
417

H
Haojun Liao 已提交
418 419 420
      if (ref == 1) {
        // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may be
        // destroyed by refresh worker if decrease ref count before removing it from linked-list.
H
Haojun Liao 已提交
421
        assert(pNode->pTNodeHeader->pData == pNode);
H
Haojun Liao 已提交
422

H
Haojun Liao 已提交
423 424 425 426
        __cache_wr_lock(pCacheObj);
        doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
        __cache_unlock(pCacheObj);

H
Haojun Liao 已提交
427 428 429
        ref = T_REF_DEC(pNode);
        assert(ref == 0);

H
Haojun Liao 已提交
430
        doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
H
Haojun Liao 已提交
431 432
      } else {
        ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
433
        assert(ref >= 0);
H
Haojun Liao 已提交
434 435
      }
    } else {
436 437
      // NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
      // when reaches here.
H
Haojun Liao 已提交
438
      SCacheDataNode *p = NULL;
S
cache  
Shengliang Guan 已提交
439 440 441
      int32_t         ret = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
      //      int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, pNode->key, pNode->keySize, &p, sizeof(void
      //      *));
442
      ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
443 444 445 446

      // successfully remove from hash table, if failed, this node must have been move to trash already, do nothing.
      // note that the remove operation can be executed only once.
      if (ret == 0) {
H
Haojun Liao 已提交
447
        if (p != pNode) {
S
cache  
Shengliang Guan 已提交
448 449 450 451
          uDebug(
              "cache:%s, key:%p, successfully removed a new entry:%p, refcnt:%d, prev entry:%p has been removed by "
              "others already",
              pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data);
H
Haojun Liao 已提交
452 453

          assert(p->pTNodeHeader == NULL);
H
Haojun Liao 已提交
454
          taosAddToTrashcan(pCacheObj, p);
H
Haojun Liao 已提交
455
        } else {
H
Haojun Liao 已提交
456
          uDebug("cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d", pCacheObj->name, pNode->key,
H
Haojun Liao 已提交
457 458 459
                 pNode->data, ref);
          if (ref > 0) {
            assert(pNode->pTNodeHeader == NULL);
460

H
Haojun Liao 已提交
461
            taosAddToTrashcan(pCacheObj, pNode);
H
Haojun Liao 已提交
462 463
          } else {  // ref == 0
            atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size);
H
Haojun Liao 已提交
464

H
Haojun Liao 已提交
465
            int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable);
H
Haojun Liao 已提交
466
            uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes",
H
Haojun Liao 已提交
467
                   pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->totalSize);
H
Haojun Liao 已提交
468

H
Haojun Liao 已提交
469 470 471
            if (pCacheObj->freeFp) {
              pCacheObj->freeFp(pNode->data);
            }
H
Haojun Liao 已提交
472

H
Haojun Liao 已提交
473
            free(pNode);
H
Haojun Liao 已提交
474
          }
H
Haojun Liao 已提交
475
        }
476
      } else {
S
cache  
Shengliang Guan 已提交
477 478
        uDebug("cache:%s, key:%p, %p has been removed from hash table by others already, refcnt:%d", pCacheObj->name,
               pNode->key, pNode->data, ref);
H
Haojun Liao 已提交
479
      }
H
Haojun Liao 已提交
480 481
    }

482
  } else {
H
Haojun Liao 已提交
483
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
S
cache  
Shengliang Guan 已提交
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506
    char *key = pNode->key;
    char *p = pNode->data;

    //    int32_t ref = T_REF_VAL_GET(pNode);
    //
    //    if (ref == 1 && inTrashcan) {
    //      // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may
    //      be
    //      // destroyed by refresh worker if decrease ref count before removing it from linked-list.
    //      assert(pNode->pTNodeHeader->pData == pNode);
    //
    //      __cache_wr_lock(pCacheObj);
    //      doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
    //      __cache_unlock(pCacheObj);
    //
    //      ref = T_REF_DEC(pNode);
    //      assert(ref == 0);
    //
    //      doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
    //    } else {
    //      ref = T_REF_DEC(pNode);
    //      assert(ref >= 0);
    //    }
H
Haojun Liao 已提交
507

H
Haojun Liao 已提交
508
    int32_t ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
509
    uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trashcan:%d", pCacheObj->name, key, p, ref, inTrashcan);
H
Haojun Liao 已提交
510 511
  }
}
H
Haojun Liao 已提交
512

H
Haojun Liao 已提交
513
typedef struct SHashTravSupp {
S
cache  
Shengliang Guan 已提交
514 515
  SCacheObj        *pCacheObj;
  int64_t           time;
A
AlexDuan 已提交
516
  __cache_trav_fn_t fp;
S
cache  
Shengliang Guan 已提交
517
  void             *param1;
H
Haojun Liao 已提交
518 519
} SHashTravSupp;

S
cache  
Shengliang Guan 已提交
520 521 522
static bool travHashTableEmptyFn(void *param, void *data) {
  SHashTravSupp *ps = (SHashTravSupp *)param;
  SCacheObj     *pCacheObj = ps->pCacheObj;
H
Haojun Liao 已提交
523

S
cache  
Shengliang Guan 已提交
524
  SCacheDataNode *pNode = *(SCacheDataNode **)data;
H
Haojun Liao 已提交
525 526 527

  if (T_REF_VAL_GET(pNode) == 0) {
    taosCacheReleaseNode(pCacheObj, pNode);
S
cache  
Shengliang Guan 已提交
528
  } else {  // do add to trashcan
H
Haojun Liao 已提交
529
    taosAddToTrashcan(pCacheObj, pNode);
530
  }
H
Haojun Liao 已提交
531 532 533

  // this node should be remove from hash table
  return false;
534 535
}

536
void taosCacheEmpty(SCacheObj *pCacheObj) {
H
Haojun Liao 已提交
537 538
  SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};

H
Haojun Liao 已提交
539
//  taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
H
Haojun Liao 已提交
540
  taosTrashcanEmpty(pCacheObj, false);
541 542 543 544 545 546
}

void taosCacheCleanup(SCacheObj *pCacheObj) {
  if (pCacheObj == NULL) {
    return;
  }
547 548

  pCacheObj->deleting = 1;
549 550

  // wait for the refresh thread quit before destroying the cache object.
551
  // But in the dll, the child thread will be killed before atexit takes effect.
S
cache  
Shengliang Guan 已提交
552 553 554
  while (atomic_load_8(&pCacheObj->deleting) != 0) {
    if (refreshWorkerNormalStopped) break;
    if (refreshWorkerUnexpectedStopped) return;
555
    taosMsleep(50);
S
TD-2805  
Shengliang Guan 已提交
556
  }
557

H
Haojun Liao 已提交
558
  uInfo("cache:%s will be cleaned up", pCacheObj->name);
559 560 561
  doCleanupDataCache(pCacheObj);
}

H
Haojun Liao 已提交
562
SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration) {
563
  size_t totalSize = size + sizeof(SCacheDataNode) + keyLen;
564 565 566 567 568 569 570 571 572 573

  SCacheDataNode *pNewNode = calloc(1, totalSize);
  if (pNewNode == NULL) {
    uError("failed to allocate memory, reason:%s", strerror(errno));
    return NULL;
  }

  memcpy(pNewNode->data, pData, size);

  pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size;
S
Shengliang Guan 已提交
574
  pNewNode->keySize = (uint16_t)keyLen;
575 576 577

  memcpy(pNewNode->key, key, keyLen);

S
cache  
Shengliang Guan 已提交
578 579 580 581 582
  pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
  pNewNode->lifespan = duration;
  pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan;
  pNewNode->signature = (uint64_t)pNewNode;
  pNewNode->size = (uint32_t)totalSize;
583 584 585 586

  return pNewNode;
}

H
Haojun Liao 已提交
587
void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
H
Haojun Liao 已提交
588
  if (pNode->inTrashcan) { /* node is already in trash */
589
    assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
590 591 592
    return;
  }

593
  __cache_wr_lock(pCacheObj);
594 595
  STrashElem *pElem = calloc(1, sizeof(STrashElem));
  pElem->pData = pNode;
596 597
  pElem->prev = NULL;
  pElem->next = NULL;
H
Haojun Liao 已提交
598
  pNode->inTrashcan = true;
599
  pNode->pTNodeHeader = pElem;
H
Haojun Liao 已提交
600

601 602 603 604 605 606 607
  pElem->next = pCacheObj->pTrash;
  if (pCacheObj->pTrash) {
    pCacheObj->pTrash->prev = pElem;
  }

  pCacheObj->pTrash = pElem;
  pCacheObj->numOfElemsInTrash++;
H
Haojun Liao 已提交
608
  __cache_unlock(pCacheObj);
609

610 611
  uDebug("cache:%s key:%p, %p move to trashcan, pTrashElem:%p, numOfElem in trashcan:%d", pCacheObj->name, pNode->key,
         pNode->data, pElem, pCacheObj->numOfElemsInTrash);
612 613
}

H
Haojun Liao 已提交
614
void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
615 616 617 618
  __cache_wr_lock(pCacheObj);

  if (pCacheObj->numOfElemsInTrash == 0) {
    if (pCacheObj->pTrash != NULL) {
H
Haojun Liao 已提交
619
      pCacheObj->pTrash = NULL;
S
cache  
Shengliang Guan 已提交
620 621
      uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name,
             pCacheObj->numOfElemsInTrash);
622 623 624 625
    }

    __cache_unlock(pCacheObj);
    return;
H
hjxilinx 已提交
626
  }
627

S
cache  
Shengliang Guan 已提交
628
  const char *stat[] = {"false", "true"};
H
Haojun Liao 已提交
629
  uDebug("cache:%s start to cleanup trashcan, numOfElem in trashcan:%d, free:%s", pCacheObj->name,
S
cache  
Shengliang Guan 已提交
630
         pCacheObj->numOfElemsInTrash, (force ? stat[1] : stat[0]));
631

H
Haojun Liao 已提交
632
  STrashElem *pElem = pCacheObj->pTrash;
633 634
  while (pElem) {
    T_REF_VAL_CHECK(pElem->pData);
H
Haojun Liao 已提交
635
    assert(pElem->next != pElem && pElem->prev != pElem);
636 637

    if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
S
cache  
Shengliang Guan 已提交
638 639
      uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key,
             pElem->pData->data, pCacheObj->numOfElemsInTrash - 1);
640

H
Haojun Liao 已提交
641 642 643
      doRemoveElemInTrashcan(pCacheObj, pElem);
      doDestroyTrashcanElem(pCacheObj, pElem);
      pElem = pCacheObj->pTrash;
644 645 646 647 648 649 650 651 652
    } else {
      pElem = pElem->next;
    }
  }

  __cache_unlock(pCacheObj);
}

void doCleanupDataCache(SCacheObj *pCacheObj) {
S
cache  
Shengliang Guan 已提交
653 654
  //  SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
  //  taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
655

H
Haojun Liao 已提交
656 657
  // todo memory leak if there are object with refcount greater than 0 in hash table?
  taosHashCleanup(pCacheObj->pHashTable);
658
  taosTrashcanEmpty(pCacheObj, true);
H
Haojun Liao 已提交
659

660
  __cache_lock_destroy(pCacheObj);
S
cache  
Shengliang Guan 已提交
661

S
TD-1848  
Shengliang Guan 已提交
662
  tfree(pCacheObj->name);
663 664
  memset(pCacheObj, 0, sizeof(SCacheObj));
  free(pCacheObj);
665
}
666

S
cache  
Shengliang Guan 已提交
667 668 669
bool travHashTableFn(void *param, void *data) {
  SHashTravSupp *ps = (SHashTravSupp *)param;
  SCacheObj     *pCacheObj = ps->pCacheObj;
670

S
cache  
Shengliang Guan 已提交
671
  SCacheDataNode *pNode = *(SCacheDataNode **)data;
S
Shengliang Guan 已提交
672
  if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) {
H
Haojun Liao 已提交
673
    taosCacheReleaseNode(pCacheObj, pNode);
H
Haojun Liao 已提交
674

H
Haojun Liao 已提交
675 676 677
    // this node should be remove from hash table
    return false;
  }
678

H
Haojun Liao 已提交
679
  if (ps->fp) {
A
AlexDuan 已提交
680
    (ps->fp)(pNode->data, ps->param1);
681 682
  }

H
Haojun Liao 已提交
683 684 685 686
  // do not remove element in hash table
  return true;
}

S
cache  
Shengliang Guan 已提交
687
static void doCacheRefresh(SCacheObj *pCacheObj, int64_t time, __cache_trav_fn_t fp, void *param1) {
H
Haojun Liao 已提交
688
  assert(pCacheObj != NULL);
689

A
AlexDuan 已提交
690
  SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1};
S
cache  
Shengliang Guan 已提交
691
  //  taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup);
692 693
}

694
void taosCacheRefreshWorkerUnexpectedStopped(void) {
S
cache  
Shengliang Guan 已提交
695 696
  if (!refreshWorkerNormalStopped) {
    refreshWorkerUnexpectedStopped = true;
697 698 699
  }
}

S
cache  
Shengliang Guan 已提交
700
void *taosCacheTimedRefresh(void *handle) {
701 702
  assert(pCacheArrayList != NULL);
  uDebug("cache refresh thread starts");
703

H
Haojun Liao 已提交
704
  setThreadName("cacheRefresh");
705

S
cache  
Shengliang Guan 已提交
706 707
  const int32_t SLEEP_DURATION = 500;  // 500 ms
  int64_t       count = 0;
708
  atexit(taosCacheRefreshWorkerUnexpectedStopped);
709

S
cache  
Shengliang Guan 已提交
710
  while (1) {
711
    taosMsleep(SLEEP_DURATION);
712 713 714
    if (stopRefreshWorker) {
      goto _end;
    }
H
Haojun Liao 已提交
715

716 717 718
    pthread_mutex_lock(&guard);
    size_t size = taosArrayGetSize(pCacheArrayList);
    pthread_mutex_unlock(&guard);
719

720
    count += 1;
721

S
cache  
Shengliang Guan 已提交
722
    for (int32_t i = 0; i < size; ++i) {
723
      pthread_mutex_lock(&guard);
S
cache  
Shengliang Guan 已提交
724
      SCacheObj *pCacheObj = taosArrayGetP(pCacheArrayList, i);
725

726
      if (pCacheObj == NULL) {
727 728 729
        uError("object is destroyed. ignore and try next");
        pthread_mutex_unlock(&guard);
        continue;
730
      }
731

732 733
      // check if current cache object will be deleted every 500ms.
      if (pCacheObj->deleting) {
734 735 736
        taosArrayRemove(pCacheArrayList, i);
        size = taosArrayGetSize(pCacheArrayList);

S
cache  
Shengliang Guan 已提交
737 738
        uDebug("%s is destroying, remove it from refresh list, remain cache obj:%" PRIzu, pCacheObj->name, size);
        pCacheObj->deleting = 0;  // reset the deleting flag to enable pCacheObj to continue releasing resources.
739

740 741
        pthread_mutex_unlock(&guard);
        continue;
742
      }
743

744 745
      pthread_mutex_unlock(&guard);

746 747 748 749 750 751 752 753 754 755 756 757 758 759 760
      if ((count % pCacheObj->checkTick) != 0) {
        continue;
      }

      size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable);
      if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
        continue;
      }

      uDebug("%s refresh thread scan", pCacheObj->name);
      pCacheObj->statistics.refreshCount++;

      // refresh data in hash table
      if (elemInHash > 0) {
        int64_t now = taosGetTimestampMs();
A
AlexDuan 已提交
761
        doCacheRefresh(pCacheObj, now, NULL, NULL);
762 763 764 765
      }

      taosTrashcanEmpty(pCacheObj, false);
    }
766 767
  }

S
cache  
Shengliang Guan 已提交
768
_end:
769
  taosArrayDestroy(pCacheArrayList);
H
Haojun Liao 已提交
770 771 772

  pCacheArrayList = NULL;
  pthread_mutex_destroy(&guard);
S
cache  
Shengliang Guan 已提交
773
  refreshWorkerNormalStopped = true;
H
Haojun Liao 已提交
774

775
  uDebug("cache refresh thread quits");
776
  return NULL;
dengyihao's avatar
dengyihao 已提交
777
}
778

S
cache  
Shengliang Guan 已提交
779
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1) {
780 781 782 783 784
  if (pCacheObj == NULL) {
    return;
  }

  int64_t now = taosGetTimestampMs();
A
AlexDuan 已提交
785
  doCacheRefresh(pCacheObj, now, fp, param1);
786
}
787

S
cache  
Shengliang Guan 已提交
788
void taosStopCacheRefreshWorker(void) { stopRefreshWorker = true; }