tcache.c 31.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
S
cache  
Shengliang Guan 已提交
17
#include "tcache.h"
18
#include "taoserror.h"
wafwerar's avatar
wafwerar 已提交
19
#include "osThread.h"
S
log  
Shengliang Guan 已提交
20
#include "tlog.h"
H
hzcheng 已提交
21 22
#include "tutil.h"

L
Liu Jicong 已提交
23 24
#define CACHE_MAX_CAPACITY     1024 * 1024 * 16
#define CACHE_DEFAULT_CAPACITY 1024 * 4
H
Haojun Liao 已提交
25

L
Liu Jicong 已提交
26
static TdThread      cacheRefreshWorker = {0};
wafwerar's avatar
wafwerar 已提交
27
static TdThreadOnce  cacheThreadInit = PTHREAD_ONCE_INIT;
wafwerar's avatar
wafwerar 已提交
28
static TdThreadMutex guard = TD_PTHREAD_MUTEX_INITIALIZER;
L
Liu Jicong 已提交
29 30 31 32
static SArray       *pCacheArrayList = NULL;
static bool          stopRefreshWorker = false;
static bool          refreshWorkerNormalStopped = false;
static bool          refreshWorkerUnexpectedStopped = false;
33 34 35 36 37

typedef struct SCacheNode {
  uint64_t           addedTime;   // the added time when this element is added or updated into cache
  uint64_t           lifespan;    // life duration when this element should be remove from cache
  int64_t            expireTime;  // expire time
wafwerar's avatar
wafwerar 已提交
38
  void*              signature;
39
  struct STrashElem *pTNodeHeader;    // point to trash node head
L
Liu Jicong 已提交
40
  uint16_t           keyLen : 15;     // max key size: 32kb
41 42 43 44 45 46 47 48 49 50
  bool               inTrashcan : 1;  // denote if it is in trash or not
  uint32_t           size;            // allocated size for current SCacheNode
  uint32_t           dataLen;
  T_REF_DECLARE()
  struct SCacheNode *pNext;
  char              *key;
  char              *data;
} SCacheNode;

typedef struct SCacheEntry {
L
Liu Jicong 已提交
51 52
  int32_t     num;    // number of elements in current entry
  SRWLatch    latch;  // entry latch
53 54 55
  SCacheNode *next;
} SCacheEntry;

56
struct STrashElem {
57 58 59
  struct STrashElem *prev;
  struct STrashElem *next;
  SCacheNode        *pData;
60
};
61

62
struct SCacheIter {
H
Haojun Liao 已提交
63 64 65 66 67
  SCacheObj   *pCacheObj;
  SCacheNode **pCurrent;
  int32_t      entryIndex;
  int32_t      index;
  int32_t      numOfObj;
68
};
H
Haojun Liao 已提交
69

70 71 72 73 74 75 76 77 78
/*
 * to accommodate the old data which has the same key value of new one in hashList
 * when an new node is put into cache, if an existed one with the same key:
 * 1. if the old one does not be referenced, update it.
 * 2. otherwise, move the old one to pTrash, addedTime the new one.
 *
 * when the node in pTrash does not be referenced, it will be release at the expired expiredTime
 */
struct SCacheObj {
L
Liu Jicong 已提交
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
  int64_t      sizeInBytes;  // total allocated buffer in this hash table, SCacheObj is not included.
  int64_t      refreshTime;
  char        *name;
  SCacheStatis statistics;

  SCacheEntry      *pEntryList;
  size_t            capacity;    // number of slots
  size_t            numOfElems;  // number of elements in cache
  _hash_fn_t        hashFp;      // hash function
  __cache_free_fn_t freeFp;

  uint32_t    numOfElemsInTrash;  // number of element in trash
  STrashElem *pTrash;

  uint8_t  deleting;  // set the deleting flag to stop refreshing ASAP.
  TdThread refreshWorker;
  bool     extendLifespan;  // auto extend life span when one item is accessed.
  int64_t  checkTick;       // tick used to record the check times of the refresh threads
97
#if defined(LINUX)
wafwerar's avatar
wafwerar 已提交
98
  TdThreadRwlock lock;
99
#else
wafwerar's avatar
wafwerar 已提交
100
  TdThreadMutex lock;
101 102 103 104 105 106 107 108 109 110 111
#endif
};

typedef struct SCacheObjTravSup {
  SCacheObj        *pCacheObj;
  int64_t           time;
  __cache_trav_fn_t fp;
  void             *param1;
} SCacheObjTravSup;

static FORCE_INLINE void __trashcan_wr_lock(SCacheObj *pCacheObj) {
112
#if defined(LINUX)
wafwerar's avatar
wafwerar 已提交
113
  taosThreadRwlockWrlock(&pCacheObj->lock);
114
#else
wafwerar's avatar
wafwerar 已提交
115
  taosThreadMutexLock(&pCacheObj->lock);
116 117 118
#endif
}

119
static FORCE_INLINE void __trashcan_unlock(SCacheObj *pCacheObj) {
120
#if defined(LINUX)
wafwerar's avatar
wafwerar 已提交
121
  taosThreadRwlockUnlock(&pCacheObj->lock);
122
#else
wafwerar's avatar
wafwerar 已提交
123
  taosThreadMutexUnlock(&pCacheObj->lock);
124 125 126
#endif
}

127
static FORCE_INLINE int32_t __trashcan_lock_init(SCacheObj *pCacheObj) {
128
#if defined(LINUX)
wafwerar's avatar
wafwerar 已提交
129
  return taosThreadRwlockInit(&pCacheObj->lock, NULL);
130
#else
wafwerar's avatar
wafwerar 已提交
131
  return taosThreadMutexInit(&pCacheObj->lock, NULL);
132 133 134
#endif
}

135
static FORCE_INLINE void __trashcan_lock_destroy(SCacheObj *pCacheObj) {
136
#if defined(LINUX)
wafwerar's avatar
wafwerar 已提交
137
  taosThreadRwlockDestroy(&pCacheObj->lock);
138
#else
wafwerar's avatar
wafwerar 已提交
139
  taosThreadMutexDestroy(&pCacheObj->lock);
140 141 142
#endif
}

143 144 145 146 147 148 149 150 151 152
/**
 * do cleanup the taos cache
 * @param pCacheObj
 */
static void doCleanupDataCache(SCacheObj *pCacheObj);

/**
 * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
 * @param handle   Cache object handle
 */
S
cache  
Shengliang Guan 已提交
153
static void *taosCacheTimedRefresh(void *handle);
154

H
Haojun Liao 已提交
155
static void doInitRefreshThread(void) {
156 157
  pCacheArrayList = taosArrayInit(4, POINTER_BYTES);

wafwerar's avatar
wafwerar 已提交
158 159 160
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
  taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
161

wafwerar's avatar
wafwerar 已提交
162 163
  taosThreadCreate(&cacheRefreshWorker, &thattr, taosCacheTimedRefresh, NULL);
  taosThreadAttrDestroy(&thattr);
164 165
}

wafwerar's avatar
wafwerar 已提交
166 167
TdThread doRegisterCacheObj(SCacheObj *pCacheObj) {
  taosThreadOnce(&cacheThreadInit, doInitRefreshThread);
168

wafwerar's avatar
wafwerar 已提交
169
  taosThreadMutexLock(&guard);
170
  taosArrayPush(pCacheArrayList, &pCacheObj);
wafwerar's avatar
wafwerar 已提交
171
  taosThreadMutexUnlock(&guard);
172 173 174 175

  return cacheRefreshWorker;
}

H
hzcheng 已提交
176 177 178 179 180 181
/**
 * @param key      key of object for hash, usually a null-terminated string
 * @param keyLen   length of key
 * @param pData    actually data. required a consecutive memory block, no pointer is allowed
 *                 in pData. Pointer copy causes memory access error.
 * @param size     size of block
182
 * @param lifespan total survial expiredTime from now
183
 * @return         SCacheNode
H
hzcheng 已提交
184
 */
185
static SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
L
Liu Jicong 已提交
186
                                       uint64_t duration);
H
hzcheng 已提交
187 188

/**
189
 * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
H
hzcheng 已提交
190
 * It will be removed until the pNode->refCount == 0
191
 * @param pCacheObj    Cache object
H
hzcheng 已提交
192 193
 * @param pNode   Cache slot object
 */
194
static void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheNode *pNode);
195

H
hzcheng 已提交
196 197 198
/**
 * remove nodes in trash with refCount == 0 in cache
 * @param pNode
199
 * @param pCacheObj
H
hzcheng 已提交
200 201 202
 * @param force   force model, if true, remove data in trash without check refcount.
 *                may cause corruption. So, forece model only applys before cache is closed
 */
H
Haojun Liao 已提交
203
static void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force);
H
hzcheng 已提交
204 205 206

/**
 * release node
207
 * @param pCacheObj      cache object
H
Haojun Liao 已提交
208
 * @param pNode          data node
H
hzcheng 已提交
209
 */
210
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheNode *pNode) {
wafwerar's avatar
wafwerar 已提交
211
  if (pNode->signature != pNode) {
S
slguan 已提交
212
    uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
H
hzcheng 已提交
213 214
    return;
  }
215

216
  atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
H
Haojun Liao 已提交
217

H
Haojun Liao 已提交
218
  uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, total num:%d size:%" PRId64 "bytes",
219
         pCacheObj->name, pNode->key, pNode->data, pNode->size, (int)pCacheObj->numOfElems - 1, pCacheObj->sizeInBytes);
H
Haojun Liao 已提交
220 221 222 223

  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pNode->data);
  }
224

wafwerar's avatar
wafwerar 已提交
225
  taosMemoryFree(pNode);
H
hzcheng 已提交
226 227
}

S
cache  
Shengliang Guan 已提交
228
static FORCE_INLINE STrashElem *doRemoveElemInTrashcan(SCacheObj *pCacheObj, STrashElem *pElem) {
wafwerar's avatar
wafwerar 已提交
229
  if (pElem->pData->signature != pElem->pData) {
H
Haojun Liao 已提交
230
    uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
H
Haojun Liao 已提交
231
    return NULL;
H
Haojun Liao 已提交
232 233
  }

S
cache  
Shengliang Guan 已提交
234
  STrashElem *next = pElem->next;
H
Haojun Liao 已提交
235

H
Haojun Liao 已提交
236 237 238
  pCacheObj->numOfElemsInTrash--;
  if (pElem->prev) {
    pElem->prev->next = pElem->next;
S
cache  
Shengliang Guan 已提交
239
  } else {  // pnode is the header, update header
H
Haojun Liao 已提交
240 241 242
    pCacheObj->pTrash = pElem->next;
  }

H
Haojun Liao 已提交
243 244
  if (next) {
    next->prev = pElem->prev;
H
Haojun Liao 已提交
245
  }
H
Haojun Liao 已提交
246 247 248 249 250 251

  if (pCacheObj->numOfElemsInTrash == 0) {
    assert(pCacheObj->pTrash == NULL);
  }

  return next;
H
Haojun Liao 已提交
252 253
}

S
cache  
Shengliang Guan 已提交
254
static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj *pCacheObj, STrashElem *pElem) {
H
Haojun Liao 已提交
255 256 257 258
  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pElem->pData->data);
  }

wafwerar's avatar
wafwerar 已提交
259 260
  taosMemoryFree(pElem->pData);
  taosMemoryFree(pElem);
H
Haojun Liao 已提交
261
}
H
hzcheng 已提交
262

263 264 265 266 267 268
static void pushfrontNodeInEntryList(SCacheEntry *pEntry, SCacheNode *pNode) {
  assert(pNode != NULL && pEntry != NULL);

  pNode->pNext = pEntry->next;
  pEntry->next = pNode;
  pEntry->num += 1;
D
dapan1121 已提交
269
  ASSERT((pEntry->next && pEntry->num > 0) || (NULL == pEntry->next && pEntry->num == 0));
270 271
}

L
Liu Jicong 已提交
272
static void removeNodeInEntryList(SCacheEntry *pe, SCacheNode *prev, SCacheNode *pNode) {
273 274 275 276 277 278 279
  if (prev == NULL) {
    ASSERT(pe->next == pNode);
    pe->next = pNode->pNext;
  } else {
    prev->pNext = pNode->pNext;
  }

H
Haojun Liao 已提交
280
  pNode->pNext = NULL;
281
  pe->num -= 1;
D
dapan1121 已提交
282
  ASSERT((pe->next && pe->num > 0) || (NULL == pe->next && pe->num == 0));  
283 284
}

L
Liu Jicong 已提交
285
static FORCE_INLINE SCacheEntry *doFindEntry(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
286 287 288 289 290
  uint32_t hashVal = (*pCacheObj->hashFp)(key, keyLen);
  int32_t  slot = hashVal % pCacheObj->capacity;
  return &pCacheObj->pEntryList[slot];
}

L
Liu Jicong 已提交
291 292
static FORCE_INLINE SCacheNode *doSearchInEntryList(SCacheEntry *pe, const void *key, size_t keyLen,
                                                    SCacheNode **prev) {
293 294 295 296 297 298 299 300 301 302 303 304
  SCacheNode *pNode = pe->next;
  while (pNode) {
    if ((pNode->keyLen == keyLen) && memcmp(pNode->key, key, keyLen) == 0) {
      break;
    }
    *prev = pNode;
    pNode = pNode->pNext;
  }

  return pNode;
}

L
Liu Jicong 已提交
305
static bool doRemoveExpiredFn(void *param, SCacheNode *pNode) {
H
Haojun Liao 已提交
306
  SCacheObjTravSup *ps = (SCacheObjTravSup *)param;
L
Liu Jicong 已提交
307
  SCacheObj        *pCacheObj = ps->pCacheObj;
H
Haojun Liao 已提交
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325

  if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) {
    taosCacheReleaseNode(pCacheObj, pNode);

    // this node should be remove from hash table
    return false;
  }

  if (ps->fp) {
    (ps->fp)(pNode->data, ps->param1);
  }

  // do not remove element in hash table
  return true;
}

static bool doRemoveNodeFn(void *param, SCacheNode *pNode) {
  SCacheObjTravSup *ps = (SCacheObjTravSup *)param;
L
Liu Jicong 已提交
326
  SCacheObj        *pCacheObj = ps->pCacheObj;
H
Haojun Liao 已提交
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352

  if (T_REF_VAL_GET(pNode) == 0) {
    taosCacheReleaseNode(pCacheObj, pNode);
  } else {  // do add to trashcan
    taosAddToTrashcan(pCacheObj, pNode);
  }

  // this node should be remove from hash table
  return false;
}

static FORCE_INLINE int32_t getCacheCapacity(int32_t length) {
  int32_t len = 0;
  if (length < CACHE_DEFAULT_CAPACITY) {
    len = CACHE_DEFAULT_CAPACITY;
    return len;
  } else if (length > CACHE_MAX_CAPACITY) {
    len = CACHE_MAX_CAPACITY;
    return len;
  }

  len = CACHE_DEFAULT_CAPACITY;
  while (len < length && len < CACHE_MAX_CAPACITY) {
    len = (len << 1u);
  }

L
Liu Jicong 已提交
353
  return len > CACHE_MAX_CAPACITY ? CACHE_MAX_CAPACITY : len;
H
Haojun Liao 已提交
354 355
}

L
Liu Jicong 已提交
356
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInMs, bool extendLifespan, __cache_free_fn_t fn,
S
cache  
Shengliang Guan 已提交
357 358
                         const char *cacheName) {
  const int32_t SLEEP_DURATION = 500;  // 500 ms
359

L
Liu Jicong 已提交
360
  if (refreshTimeInMs <= 0) {
H
hzcheng 已提交
361 362
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
363

wafwerar's avatar
wafwerar 已提交
364
  SCacheObj *pCacheObj = (SCacheObj *)taosMemoryCalloc(1, sizeof(SCacheObj));
365
  if (pCacheObj == NULL) {
S
slguan 已提交
366
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
367 368
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
369

H
Haojun Liao 已提交
370 371
  // TODO add the auto extend procedure
  pCacheObj->capacity = 4096;
wafwerar's avatar
wafwerar 已提交
372
  pCacheObj->pEntryList = taosMemoryCalloc(pCacheObj->capacity, sizeof(SCacheEntry));
373
  if (pCacheObj->pEntryList == NULL) {
wafwerar's avatar
wafwerar 已提交
374
    taosMemoryFree(pCacheObj);
S
slguan 已提交
375
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
376 377
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
378

H
Haojun Liao 已提交
379
  // set free cache node callback function
L
Liu Jicong 已提交
380 381 382 383
  pCacheObj->hashFp = taosGetDefaultHashFunction(keyType);
  pCacheObj->freeFp = fn;
  pCacheObj->refreshTime = refreshTimeInMs;
  pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION;
384
  pCacheObj->extendLifespan = extendLifespan;  // the TTL after the last access
385

386
  if (__trashcan_lock_init(pCacheObj) != 0) {
wafwerar's avatar
wafwerar 已提交
387 388
    taosMemoryFreeClear(pCacheObj->pEntryList);
    taosMemoryFree(pCacheObj);
S
cache  
Shengliang Guan 已提交
389

S
slguan 已提交
390
    uError("failed to init lock, reason:%s", strerror(errno));
H
hzcheng 已提交
391 392
    return NULL;
  }
393

394
  pCacheObj->name = strdup(cacheName);
395
  doRegisterCacheObj(pCacheObj);
396
  return pCacheObj;
397
}
H
hzcheng 已提交
398

S
cache  
Shengliang Guan 已提交
399 400
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize,
                   int32_t durationMS) {
401
  if (pCacheObj == NULL || pCacheObj->pEntryList == NULL || pCacheObj->deleting == 1) {
402 403
    return NULL;
  }
H
Haojun Liao 已提交
404

405
  SCacheNode *pNode1 = taosCreateCacheNode(key, keyLen, pData, dataSize, durationMS);
H
Haojun Liao 已提交
406 407 408 409 410
  if (pNode1 == NULL) {
    uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
    return NULL;
  }

H
Haojun Liao 已提交
411
  T_REF_INC(pNode1);
H
Haojun Liao 已提交
412

413
  SCacheEntry *pe = doFindEntry(pCacheObj, key, keyLen);
414

415
  taosWLockLatch(&pe->latch);
416

L
Liu Jicong 已提交
417 418
  SCacheNode *prev = NULL;
  SCacheNode *pNode = doSearchInEntryList(pe, key, keyLen, &prev);
H
Haojun Liao 已提交
419

420 421
  if (pNode == NULL) {
    pushfrontNodeInEntryList(pe, pNode1);
wafwerar's avatar
wafwerar 已提交
422 423
    atomic_add_fetch_ptr(&pCacheObj->numOfElems, 1);
    atomic_add_fetch_ptr(&pCacheObj->sizeInBytes, pNode1->size);
424 425 426 427 428 429 430
    uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
           ", totalNum:%d sizeInBytes:%" PRId64 "bytes size:%" PRId64 "bytes",
           pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime, (int32_t)pCacheObj->numOfElems,
           pCacheObj->sizeInBytes, (int64_t)dataSize);
  } else {  // duplicated key exists
    // move current node to trashcan
    removeNodeInEntryList(pe, prev, pNode);
H
Haojun Liao 已提交
431

432 433 434
    if (T_REF_VAL_GET(pNode) == 0) {
      if (pCacheObj->freeFp) {
        pCacheObj->freeFp(pNode->data);
H
Haojun Liao 已提交
435
      }
436 437

      atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
wafwerar's avatar
wafwerar 已提交
438
      taosMemoryFreeClear(pNode);
439 440 441
    } else {
      taosAddToTrashcan(pCacheObj, pNode);
      uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, pNode->data);
H
Haojun Liao 已提交
442
    }
443 444 445 446 447 448 449

    pushfrontNodeInEntryList(pe, pNode1);
    atomic_add_fetch_64(&pCacheObj->sizeInBytes, pNode1->size);
    uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
           ", totalNum:%d sizeInBytes:%" PRId64 "bytes size:%" PRId64 "bytes",
           pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime, (int32_t)pCacheObj->numOfElems,
           pCacheObj->sizeInBytes, (int64_t)dataSize);
H
Haojun Liao 已提交
450 451
  }

452
  taosWUnLockLatch(&pe->latch);
H
Haojun Liao 已提交
453
  return pNode1->data;
H
hzcheng 已提交
454 455
}

H
Haojun Liao 已提交
456
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
457 458 459 460
  if (pCacheObj == NULL || pCacheObj->deleting == 1) {
    return NULL;
  }

461
  if (pCacheObj->numOfElems == 0) {
wafwerar's avatar
wafwerar 已提交
462
    atomic_add_fetch_64(&pCacheObj->statistics.missCount, 1);
463
    return NULL;
464
  }
H
Haojun Liao 已提交
465

L
Liu Jicong 已提交
466
  SCacheNode  *prev = NULL;
467 468 469
  SCacheEntry *pe = doFindEntry(pCacheObj, key, keyLen);

  taosRLockLatch(&pe->latch);
S
Shengliang Guan 已提交
470

L
Liu Jicong 已提交
471
  SCacheNode *pNode = doSearchInEntryList(pe, key, keyLen, &prev);
472 473 474 475 476 477
  if (pNode != NULL) {
    int32_t ref = T_REF_INC(pNode);
    ASSERT(ref > 0);
  }

  taosRUnLockLatch(&pe->latch);
S
Shengliang Guan 已提交
478

479
  void *pData = (pNode != NULL) ? pNode->data : NULL;
S
Shengliang Guan 已提交
480
  if (pData != NULL) {
wafwerar's avatar
wafwerar 已提交
481
    atomic_add_fetch_64(&pCacheObj->statistics.hitCount, 1);
S
cache  
Shengliang Guan 已提交
482
    uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData,
483
           T_REF_VAL_GET(pNode));
484
  } else {
wafwerar's avatar
wafwerar 已提交
485
    atomic_add_fetch_64(&pCacheObj->statistics.missCount, 1);
H
Haojun Liao 已提交
486
    uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
487
  }
S
Shengliang Guan 已提交
488

wafwerar's avatar
wafwerar 已提交
489
  atomic_add_fetch_64(&pCacheObj->statistics.totalAccess, 1);
S
Shengliang Guan 已提交
490
  return pData;
491 492
}

493 494
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
  if (pCacheObj == NULL || data == NULL) return NULL;
S
cache  
Shengliang Guan 已提交
495

496
  SCacheNode *ptNode = (SCacheNode *)((char *)data - sizeof(SCacheNode));
wafwerar's avatar
wafwerar 已提交
497
  if (ptNode->signature != ptNode) {
H
Haojun Liao 已提交
498
    uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
499 500
    return NULL;
  }
501

502
  int32_t ref = T_REF_INC(ptNode);
H
Haojun Liao 已提交
503
  uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
H
Haojun Liao 已提交
504

505 506 507 508 509
  // the data if referenced by at least one object, so the reference count must be greater than the value of 2.
  assert(ref >= 2);
  return data;
}

510
void *taosCacheTransferData(SCacheObj *pCacheObj, void **data) {
H
Haojun Liao 已提交
511
  if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
S
cache  
Shengliang Guan 已提交
512

513
  SCacheNode *ptNode = (SCacheNode *)((char *)(*data) - sizeof(SCacheNode));
wafwerar's avatar
wafwerar 已提交
514
  if (ptNode->signature != ptNode) {
H
Haojun Liao 已提交
515
    uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
516 517
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
518

519 520
  assert(T_REF_VAL_GET(ptNode) >= 1);
  char *d = *data;
S
cache  
Shengliang Guan 已提交
521

522 523 524
  // clear its reference to old area
  *data = NULL;
  return d;
H
hzcheng 已提交
525
}
526

527
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
H
Haojun Liao 已提交
528
  if (pCacheObj == NULL) {
H
Haojun Liao 已提交
529 530 531
    return;
  }

H
Haojun Liao 已提交
532
  if ((*data) == NULL) {
H
Haojun Liao 已提交
533
    uError("cache:%s, NULL data to release", pCacheObj->name);
534 535
    return;
  }
H
Haojun Liao 已提交
536 537 538 539

  // The operation of removal from hash table and addition to trashcan is not an atomic operation,
  // therefore the check for the empty of both the hash table and the trashcan has a race condition.
  // It happens when there is only one object in the cache, and two threads which has referenced this object
H
Haojun Liao 已提交
540
  // start to free the it simultaneously [TD-1569].
541
  SCacheNode *pNode = (SCacheNode *)((char *)(*data) - sizeof(SCacheNode));
wafwerar's avatar
wafwerar 已提交
542
  if (pNode->signature != pNode) {
H
Haojun Liao 已提交
543
    uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode);
544 545
    return;
  }
546

547
  *data = NULL;
548

549
  // note: extend lifespan before dec ref count
H
Haojun Liao 已提交
550
  bool inTrashcan = pNode->inTrashcan;
H
Haojun Liao 已提交
551

H
Haojun Liao 已提交
552
  if (pCacheObj->extendLifespan && (!inTrashcan) && (!_remove)) {
553
    atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
S
cache  
Shengliang Guan 已提交
554
    uDebug("cache:%s, data:%p extend expire time: %" PRId64, pCacheObj->name, pNode->data, pNode->expireTime);
555
  }
H
Haojun Liao 已提交
556

H
Haojun Liao 已提交
557 558
  if (_remove) {
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
S
cache  
Shengliang Guan 已提交
559 560
    char *key = pNode->key;
    char *d = pNode->data;
H
Haojun Liao 已提交
561

562
    int32_t ref = T_REF_VAL_GET(pNode);
H
Haojun Liao 已提交
563
    uDebug("cache:%s, key:%p, %p is released, refcnt:%d, in trashcan:%d", pCacheObj->name, key, d, ref - 1, inTrashcan);
H
Haojun Liao 已提交
564 565

    /*
S
cache  
Shengliang Guan 已提交
566 567
     * If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all
     * users releasing this resources.
H
Haojun Liao 已提交
568 569 570 571
     *
     * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
     * that tries to do the same thing.
     */
H
Haojun Liao 已提交
572
    if (inTrashcan) {
H
Haojun Liao 已提交
573
      ref = T_REF_VAL_GET(pNode);
574

H
Haojun Liao 已提交
575 576 577
      if (ref == 1) {
        // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may be
        // destroyed by refresh worker if decrease ref count before removing it from linked-list.
H
Haojun Liao 已提交
578
        assert(pNode->pTNodeHeader->pData == pNode);
H
Haojun Liao 已提交
579

580
        __trashcan_wr_lock(pCacheObj);
H
Haojun Liao 已提交
581
        doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
582
        __trashcan_unlock(pCacheObj);
H
Haojun Liao 已提交
583

H
Haojun Liao 已提交
584 585 586
        ref = T_REF_DEC(pNode);
        assert(ref == 0);

H
Haojun Liao 已提交
587
        doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
H
Haojun Liao 已提交
588 589
      } else {
        ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
590
        assert(ref >= 0);
H
Haojun Liao 已提交
591 592
      }
    } else {
593 594
      // NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
      // when reaches here.
L
Liu Jicong 已提交
595
      SCacheNode  *prev = NULL;
596 597 598
      SCacheEntry *pe = doFindEntry(pCacheObj, pNode->key, pNode->keyLen);

      taosWLockLatch(&pe->latch);
599
      ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
600

601 602 603 604 605
      SCacheNode *p = doSearchInEntryList(pe, pNode->key, pNode->keyLen, &prev);

      if (p != NULL) {
        // successfully remove from hash table, if failed, this node must have been move to trash already, do nothing.
        // note that the remove operation can be executed only once.
H
Haojun Liao 已提交
606
        if (p != pNode) {
S
cache  
Shengliang Guan 已提交
607
          uDebug(
608 609 610
              "cache:%s, key:%p, a new entry:%p found, refcnt:%d, prev entry:%p, refcnt:%d has been removed by "
              "others already, prev must in trashcan",
              pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data, T_REF_VAL_GET(pNode));
H
Haojun Liao 已提交
611

612
          assert(p->pTNodeHeader == NULL && pNode->pTNodeHeader != NULL);
H
Haojun Liao 已提交
613
        } else {
614
          removeNodeInEntryList(pe, prev, p);
H
Haojun Liao 已提交
615
          uDebug("cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d", pCacheObj->name, pNode->key,
H
Haojun Liao 已提交
616 617 618
                 pNode->data, ref);
          if (ref > 0) {
            assert(pNode->pTNodeHeader == NULL);
H
Haojun Liao 已提交
619
            taosAddToTrashcan(pCacheObj, pNode);
H
Haojun Liao 已提交
620
          } else {  // ref == 0
621
            atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
H
Haojun Liao 已提交
622

623
            int32_t size = (int32_t)pCacheObj->numOfElems;
H
Haojun Liao 已提交
624
            uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes",
625
                   pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->sizeInBytes);
H
Haojun Liao 已提交
626

H
Haojun Liao 已提交
627 628 629
            if (pCacheObj->freeFp) {
              pCacheObj->freeFp(pNode->data);
            }
H
Haojun Liao 已提交
630

wafwerar's avatar
wafwerar 已提交
631
            taosMemoryFree(pNode);
H
Haojun Liao 已提交
632
          }
H
Haojun Liao 已提交
633
        }
634 635

        taosWUnLockLatch(&pe->latch);
636
      } else {
S
cache  
Shengliang Guan 已提交
637 638
        uDebug("cache:%s, key:%p, %p has been removed from hash table by others already, refcnt:%d", pCacheObj->name,
               pNode->key, pNode->data, ref);
H
Haojun Liao 已提交
639
      }
H
Haojun Liao 已提交
640 641
    }

642
  } else {
H
Haojun Liao 已提交
643
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
S
cache  
Shengliang Guan 已提交
644 645 646
    char *key = pNode->key;
    char *p = pNode->data;

H
Haojun Liao 已提交
647
    int32_t ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
648
    uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trashcan:%d", pCacheObj->name, key, p, ref, inTrashcan);
H
Haojun Liao 已提交
649 650
  }
}
H
Haojun Liao 已提交
651

L
Liu Jicong 已提交
652
void doTraverseElems(SCacheObj *pCacheObj, bool (*fp)(void *param, SCacheNode *pNode), SCacheObjTravSup *pSup) {
653 654 655 656 657 658 659 660 661
  int32_t numOfEntries = (int32_t)pCacheObj->capacity;
  for (int32_t i = 0; i < numOfEntries; ++i) {
    SCacheEntry *pEntry = &pCacheObj->pEntryList[i];
    if (pEntry->num == 0) {
      continue;
    }

    taosWLockLatch(&pEntry->latch);

D
dapan1121 已提交
662
    SCacheNode **pPre = &pEntry->next;
663 664 665 666 667
    SCacheNode *pNode = pEntry->next;
    while (pNode != NULL) {
      SCacheNode *next = pNode->pNext;

      if (fp(pSup, pNode)) {
D
dapan1121 已提交
668
        pPre = &pNode->pNext;
669 670
        pNode = pNode->pNext;
      } else {
D
dapan1121 已提交
671
        *pPre = next;
672
        pEntry->num -= 1;
D
dapan1121 已提交
673
        ASSERT((pEntry->next && pEntry->num > 0) || (NULL == pEntry->next && pEntry->num == 0));
674

wafwerar's avatar
wafwerar 已提交
675
        atomic_sub_fetch_ptr(&pCacheObj->numOfElems, 1);
676 677 678 679 680 681 682
        pNode = next;
      }
    }

    taosWUnLockLatch(&pEntry->latch);
  }
}
H
Haojun Liao 已提交
683

L
Liu Jicong 已提交
684
void taosCacheEmpty(SCacheObj *pCacheObj) {
685 686
  SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
  doTraverseElems(pCacheObj, doRemoveNodeFn, &sup);
H
Haojun Liao 已提交
687
  taosTrashcanEmpty(pCacheObj, false);
688 689 690 691 692 693
}

void taosCacheCleanup(SCacheObj *pCacheObj) {
  if (pCacheObj == NULL) {
    return;
  }
694 695

  pCacheObj->deleting = 1;
696 697

  // wait for the refresh thread quit before destroying the cache object.
698
  // But in the dll, the child thread will be killed before atexit takes effect.
S
cache  
Shengliang Guan 已提交
699 700 701
  while (atomic_load_8(&pCacheObj->deleting) != 0) {
    if (refreshWorkerNormalStopped) break;
    if (refreshWorkerUnexpectedStopped) return;
702
    taosMsleep(50);
S
TD-2805  
Shengliang Guan 已提交
703
  }
704

H
Haojun Liao 已提交
705
  uInfo("cache:%s will be cleaned up", pCacheObj->name);
706 707 708
  doCleanupDataCache(pCacheObj);
}

709 710
SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration) {
  size_t sizeInBytes = size + sizeof(SCacheNode) + keyLen;
711

wafwerar's avatar
wafwerar 已提交
712
  SCacheNode *pNewNode = taosMemoryCalloc(1, sizeInBytes);
713
  if (pNewNode == NULL) {
714
    terrno = TSDB_CODE_OUT_OF_MEMORY;
715 716 717 718
    uError("failed to allocate memory, reason:%s", strerror(errno));
    return NULL;
  }

L
Liu Jicong 已提交
719
  pNewNode->data = (char *)pNewNode + sizeof(SCacheNode);
720
  pNewNode->dataLen = size;
721 722
  memcpy(pNewNode->data, pData, size);

L
Liu Jicong 已提交
723
  pNewNode->key = (char *)pNewNode + sizeof(SCacheNode) + size;
724
  pNewNode->keyLen = (uint16_t)keyLen;
725 726 727

  memcpy(pNewNode->key, key, keyLen);

L
Liu Jicong 已提交
728 729
  pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
  pNewNode->lifespan = duration;
S
cache  
Shengliang Guan 已提交
730
  pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan;
wafwerar's avatar
wafwerar 已提交
731
  pNewNode->signature = pNewNode;
L
Liu Jicong 已提交
732
  pNewNode->size = (uint32_t)sizeInBytes;
733 734 735 736

  return pNewNode;
}

737
void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheNode *pNode) {
H
Haojun Liao 已提交
738
  if (pNode->inTrashcan) { /* node is already in trash */
739
    assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
740 741 742
    return;
  }

743
  __trashcan_wr_lock(pCacheObj);
wafwerar's avatar
wafwerar 已提交
744
  STrashElem *pElem = taosMemoryCalloc(1, sizeof(STrashElem));
745
  pElem->pData = pNode;
746 747
  pElem->prev = NULL;
  pElem->next = NULL;
H
Haojun Liao 已提交
748
  pNode->inTrashcan = true;
749
  pNode->pTNodeHeader = pElem;
H
Haojun Liao 已提交
750

751 752 753 754 755 756 757
  pElem->next = pCacheObj->pTrash;
  if (pCacheObj->pTrash) {
    pCacheObj->pTrash->prev = pElem;
  }

  pCacheObj->pTrash = pElem;
  pCacheObj->numOfElemsInTrash++;
758
  __trashcan_unlock(pCacheObj);
759

760 761
  uDebug("cache:%s key:%p, %p move to trashcan, pTrashElem:%p, numOfElem in trashcan:%d", pCacheObj->name, pNode->key,
         pNode->data, pElem, pCacheObj->numOfElemsInTrash);
762 763
}

H
Haojun Liao 已提交
764
void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
765
  __trashcan_wr_lock(pCacheObj);
766 767 768

  if (pCacheObj->numOfElemsInTrash == 0) {
    if (pCacheObj->pTrash != NULL) {
H
Haojun Liao 已提交
769
      pCacheObj->pTrash = NULL;
S
cache  
Shengliang Guan 已提交
770 771
      uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name,
             pCacheObj->numOfElemsInTrash);
772 773
    }

774
    __trashcan_unlock(pCacheObj);
775
    return;
H
hjxilinx 已提交
776
  }
777

S
cache  
Shengliang Guan 已提交
778
  const char *stat[] = {"false", "true"};
H
Haojun Liao 已提交
779
  uDebug("cache:%s start to cleanup trashcan, numOfElem in trashcan:%d, free:%s", pCacheObj->name,
S
cache  
Shengliang Guan 已提交
780
         pCacheObj->numOfElemsInTrash, (force ? stat[1] : stat[0]));
781

H
Haojun Liao 已提交
782
  STrashElem *pElem = pCacheObj->pTrash;
783 784
  while (pElem) {
    T_REF_VAL_CHECK(pElem->pData);
H
Haojun Liao 已提交
785
    assert(pElem->next != pElem && pElem->prev != pElem);
786 787

    if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
S
cache  
Shengliang Guan 已提交
788 789
      uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key,
             pElem->pData->data, pCacheObj->numOfElemsInTrash - 1);
790

H
Haojun Liao 已提交
791 792 793
      doRemoveElemInTrashcan(pCacheObj, pElem);
      doDestroyTrashcanElem(pCacheObj, pElem);
      pElem = pCacheObj->pTrash;
794 795 796 797 798
    } else {
      pElem = pElem->next;
    }
  }

799
  __trashcan_unlock(pCacheObj);
800 801 802
}

void doCleanupDataCache(SCacheObj *pCacheObj) {
803 804
  SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
  doTraverseElems(pCacheObj, doRemoveNodeFn, &sup);
805

H
Haojun Liao 已提交
806
  // todo memory leak if there are object with refcount greater than 0 in hash table?
807
  taosTrashcanEmpty(pCacheObj, true);
H
Haojun Liao 已提交
808

809
  __trashcan_lock_destroy(pCacheObj);
S
cache  
Shengliang Guan 已提交
810

wafwerar's avatar
wafwerar 已提交
811 812 813
  taosMemoryFreeClear(pCacheObj->pEntryList);
  taosMemoryFreeClear(pCacheObj->name);
  taosMemoryFree(pCacheObj);
814
}
815

S
cache  
Shengliang Guan 已提交
816
static void doCacheRefresh(SCacheObj *pCacheObj, int64_t time, __cache_trav_fn_t fp, void *param1) {
H
Haojun Liao 已提交
817
  assert(pCacheObj != NULL);
818

819 820
  SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1};
  doTraverseElems(pCacheObj, doRemoveExpiredFn, &sup);
821 822
}

823
void taosCacheRefreshWorkerUnexpectedStopped(void) {
S
cache  
Shengliang Guan 已提交
824 825
  if (!refreshWorkerNormalStopped) {
    refreshWorkerUnexpectedStopped = true;
826 827 828
  }
}

S
cache  
Shengliang Guan 已提交
829
void *taosCacheTimedRefresh(void *handle) {
830 831
  assert(pCacheArrayList != NULL);
  uDebug("cache refresh thread starts");
832

H
Haojun Liao 已提交
833
  setThreadName("cacheRefresh");
834

S
cache  
Shengliang Guan 已提交
835 836
  const int32_t SLEEP_DURATION = 500;  // 500 ms
  int64_t       count = 0;
wafwerar's avatar
wafwerar 已提交
837 838 839 840 841
#ifdef WINDOWS
  if (taosCheckCurrentInDll()) {
    atexit(taosCacheRefreshWorkerUnexpectedStopped);
  }
#endif
842

S
cache  
Shengliang Guan 已提交
843
  while (1) {
844
    taosMsleep(SLEEP_DURATION);
845 846 847
    if (stopRefreshWorker) {
      goto _end;
    }
H
Haojun Liao 已提交
848

wafwerar's avatar
wafwerar 已提交
849
    taosThreadMutexLock(&guard);
850
    size_t size = taosArrayGetSize(pCacheArrayList);
wafwerar's avatar
wafwerar 已提交
851
    taosThreadMutexUnlock(&guard);
852

853
    count += 1;
854

S
cache  
Shengliang Guan 已提交
855
    for (int32_t i = 0; i < size; ++i) {
wafwerar's avatar
wafwerar 已提交
856
      taosThreadMutexLock(&guard);
S
cache  
Shengliang Guan 已提交
857
      SCacheObj *pCacheObj = taosArrayGetP(pCacheArrayList, i);
858

859
      if (pCacheObj == NULL) {
860
        uError("object is destroyed. ignore and try next");
wafwerar's avatar
wafwerar 已提交
861
        taosThreadMutexUnlock(&guard);
862
        continue;
863
      }
864

865 866
      // check if current cache object will be deleted every 500ms.
      if (pCacheObj->deleting) {
867 868 869
        taosArrayRemove(pCacheArrayList, i);
        size = taosArrayGetSize(pCacheArrayList);

S
cache  
Shengliang Guan 已提交
870 871
        uDebug("%s is destroying, remove it from refresh list, remain cache obj:%" PRIzu, pCacheObj->name, size);
        pCacheObj->deleting = 0;  // reset the deleting flag to enable pCacheObj to continue releasing resources.
872

wafwerar's avatar
wafwerar 已提交
873
        taosThreadMutexUnlock(&guard);
874
        continue;
875
      }
876

wafwerar's avatar
wafwerar 已提交
877
      taosThreadMutexUnlock(&guard);
878

879 880 881 882
      if ((count % pCacheObj->checkTick) != 0) {
        continue;
      }

883
      size_t elemInHash = pCacheObj->numOfElems;
884 885 886 887 888 889 890 891 892 893
      if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
        continue;
      }

      uDebug("%s refresh thread scan", pCacheObj->name);
      pCacheObj->statistics.refreshCount++;

      // refresh data in hash table
      if (elemInHash > 0) {
        int64_t now = taosGetTimestampMs();
A
AlexDuan 已提交
894
        doCacheRefresh(pCacheObj, now, NULL, NULL);
895 896 897 898
      }

      taosTrashcanEmpty(pCacheObj, false);
    }
899 900
  }

S
cache  
Shengliang Guan 已提交
901
_end:
902
  taosArrayDestroy(pCacheArrayList);
H
Haojun Liao 已提交
903 904

  pCacheArrayList = NULL;
wafwerar's avatar
wafwerar 已提交
905
  taosThreadMutexDestroy(&guard);
S
cache  
Shengliang Guan 已提交
906
  refreshWorkerNormalStopped = true;
H
Haojun Liao 已提交
907

908
  uDebug("cache refresh thread quits");
909
  return NULL;
dengyihao's avatar
dengyihao 已提交
910
}
911

S
cache  
Shengliang Guan 已提交
912
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1) {
913 914 915 916 917
  if (pCacheObj == NULL) {
    return;
  }

  int64_t now = taosGetTimestampMs();
A
AlexDuan 已提交
918
  doCacheRefresh(pCacheObj, now, fp, param1);
919
}
920

H
Haojun Liao 已提交
921 922
void taosStopCacheRefreshWorker(void) {
  stopRefreshWorker = true;
wafwerar's avatar
wafwerar 已提交
923 924
  TdThreadOnce tmp = PTHREAD_ONCE_INIT;
  if (memcmp(&cacheRefreshWorker, &tmp, sizeof(TdThreadOnce)) != 0) taosThreadJoin(cacheRefreshWorker, NULL);
925
  taosArrayDestroy(pCacheArrayList);
H
Haojun Liao 已提交
926 927
}

L
Liu Jicong 已提交
928
size_t taosCacheGetNumOfObj(const SCacheObj *pCacheObj) { return pCacheObj->numOfElems + pCacheObj->numOfElemsInTrash; }
H
Haojun Liao 已提交
929

L
Liu Jicong 已提交
930
SCacheIter *taosCacheCreateIter(const SCacheObj *pCacheObj) {
H
Haojun Liao 已提交
931
  ASSERT(pCacheObj != NULL);
L
Liu Jicong 已提交
932 933
  SCacheIter *pIter = taosMemoryCalloc(1, sizeof(SCacheIter));
  pIter->pCacheObj = (SCacheObj *)pCacheObj;
H
Haojun Liao 已提交
934
  pIter->entryIndex = -1;
L
Liu Jicong 已提交
935
  pIter->index = -1;
H
Haojun Liao 已提交
936 937 938
  return pIter;
}

L
Liu Jicong 已提交
939 940
bool taosCacheIterNext(SCacheIter *pIter) {
  SCacheObj *pCacheObj = pIter->pCacheObj;
H
Haojun Liao 已提交
941 942 943

  if (pIter->index + 1 >= pIter->numOfObj) {
    // release the reference for all objects in the snapshot
L
Liu Jicong 已提交
944 945 946
    for (int32_t i = 0; i < pIter->numOfObj; ++i) {
      char *p = pIter->pCurrent[i]->data;
      taosCacheRelease(pCacheObj, (void **)&p, false);
H
Haojun Liao 已提交
947
      pIter->pCurrent[i] = NULL;
948 949 950 951
    }   

    if (pIter->entryIndex + 1 >= pCacheObj->capacity) {
      return false;
H
Haojun Liao 已提交
952 953
    }

954 955 956 957 958 959 960
    while (1) {
      pIter->entryIndex++;
      if (pIter->entryIndex >= pCacheObj->capacity) {
        return false;
      }

      SCacheEntry *pEntry = &pCacheObj->pEntryList[pIter->entryIndex];
H
Haojun Liao 已提交
961 962 963 964 965 966 967 968
      taosRLockLatch(&pEntry->latch);

      if (pEntry->num == 0) {
        taosRUnLockLatch(&pEntry->latch);
        continue;
      }

      if (pIter->numOfObj < pEntry->num) {
wafwerar's avatar
wafwerar 已提交
969
        char *tmp = taosMemoryRealloc(pIter->pCurrent, pEntry->num * POINTER_BYTES);
H
Haojun Liao 已提交
970 971 972 973 974 975 976 977 978
        if (tmp == NULL) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          taosRUnLockLatch(&pEntry->latch);
          return false;
        }

        pIter->pCurrent = (SCacheNode **)tmp;
      }

L
Liu Jicong 已提交
979
      SCacheNode *pNode = pEntry->next;
H
Haojun Liao 已提交
980 981 982 983 984 985 986 987 988 989 990 991 992
      for (int32_t i = 0; i < pEntry->num; ++i) {
        ASSERT(pNode != NULL);

        pIter->pCurrent[i] = pNode;
        int32_t ref = T_REF_INC(pIter->pCurrent[i]);
        ASSERT(ref >= 1);

        pNode = pNode->pNext;
      }

      pIter->numOfObj = pEntry->num;
      taosRUnLockLatch(&pEntry->latch);

L
Liu Jicong 已提交
993
      pIter->index = -1;
H
Haojun Liao 已提交
994 995 996 997 998 999 1000 1001
      break;
    }
  }

  pIter->index += 1;
  return true;
}

L
Liu Jicong 已提交
1002 1003
void *taosCacheIterGetData(const SCacheIter *pIter, size_t *len) {
  SCacheNode *pNode = pIter->pCurrent[pIter->index];
H
Haojun Liao 已提交
1004 1005 1006 1007
  *len = pNode->dataLen;
  return pNode->data;
}

L
Liu Jicong 已提交
1008 1009
void *taosCacheIterGetKey(const SCacheIter *pIter, size_t *len) {
  SCacheNode *pNode = pIter->pCurrent[pIter->index];
H
Haojun Liao 已提交
1010 1011 1012 1013
  *len = pNode->keyLen;
  return pNode->key;
}

L
Liu Jicong 已提交
1014
void taosCacheDestroyIter(SCacheIter *pIter) {
wafwerar's avatar
wafwerar 已提交
1015 1016
  taosMemoryFreeClear(pIter->pCurrent);
  taosMemoryFreeClear(pIter);
L
Liu Jicong 已提交
1017
}