tcache.c 31.2 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
S
cache  
Shengliang Guan 已提交
17
#include "tcache.h"
18
#include "taoserror.h"
wafwerar's avatar
wafwerar 已提交
19
#include "osThread.h"
S
log  
Shengliang Guan 已提交
20
#include "tlog.h"
H
hzcheng 已提交
21 22
#include "tutil.h"

L
Liu Jicong 已提交
23 24
#define CACHE_MAX_CAPACITY     1024 * 1024 * 16
#define CACHE_DEFAULT_CAPACITY 1024 * 4
H
Haojun Liao 已提交
25

L
Liu Jicong 已提交
26
static TdThread      cacheRefreshWorker = {0};
wafwerar's avatar
wafwerar 已提交
27
static TdThreadOnce  cacheThreadInit = PTHREAD_ONCE_INIT;
wafwerar's avatar
wafwerar 已提交
28
static TdThreadMutex guard = TD_PTHREAD_MUTEX_INITIALIZER;
L
Liu Jicong 已提交
29 30 31 32
static SArray       *pCacheArrayList = NULL;
static bool          stopRefreshWorker = false;
static bool          refreshWorkerNormalStopped = false;
static bool          refreshWorkerUnexpectedStopped = false;
33 34 35 36 37 38 39

typedef struct SCacheNode {
  uint64_t           addedTime;   // the added time when this element is added or updated into cache
  uint64_t           lifespan;    // life duration when this element should be remove from cache
  int64_t            expireTime;  // expire time
  uint64_t           signature;
  struct STrashElem *pTNodeHeader;    // point to trash node head
L
Liu Jicong 已提交
40
  uint16_t           keyLen : 15;     // max key size: 32kb
41 42 43 44 45 46 47 48 49 50
  bool               inTrashcan : 1;  // denote if it is in trash or not
  uint32_t           size;            // allocated size for current SCacheNode
  uint32_t           dataLen;
  T_REF_DECLARE()
  struct SCacheNode *pNext;
  char              *key;
  char              *data;
} SCacheNode;

typedef struct SCacheEntry {
L
Liu Jicong 已提交
51 52
  int32_t     num;    // number of elements in current entry
  SRWLatch    latch;  // entry latch
53 54 55
  SCacheNode *next;
} SCacheEntry;

56
struct STrashElem {
57 58 59
  struct STrashElem *prev;
  struct STrashElem *next;
  SCacheNode        *pData;
60
};
61

62
struct SCacheIter {
H
Haojun Liao 已提交
63 64 65 66 67
  SCacheObj   *pCacheObj;
  SCacheNode **pCurrent;
  int32_t      entryIndex;
  int32_t      index;
  int32_t      numOfObj;
68
};
H
Haojun Liao 已提交
69

70 71 72 73 74 75 76 77 78
/*
 * to accommodate the old data which has the same key value of new one in hashList
 * when an new node is put into cache, if an existed one with the same key:
 * 1. if the old one does not be referenced, update it.
 * 2. otherwise, move the old one to pTrash, addedTime the new one.
 *
 * when the node in pTrash does not be referenced, it will be release at the expired expiredTime
 */
struct SCacheObj {
L
Liu Jicong 已提交
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
  int64_t      sizeInBytes;  // total allocated buffer in this hash table, SCacheObj is not included.
  int64_t      refreshTime;
  char        *name;
  SCacheStatis statistics;

  SCacheEntry      *pEntryList;
  size_t            capacity;    // number of slots
  size_t            numOfElems;  // number of elements in cache
  _hash_fn_t        hashFp;      // hash function
  __cache_free_fn_t freeFp;

  uint32_t    numOfElemsInTrash;  // number of element in trash
  STrashElem *pTrash;

  uint8_t  deleting;  // set the deleting flag to stop refreshing ASAP.
  TdThread refreshWorker;
  bool     extendLifespan;  // auto extend life span when one item is accessed.
  int64_t  checkTick;       // tick used to record the check times of the refresh threads
97
#if defined(LINUX)
wafwerar's avatar
wafwerar 已提交
98
  TdThreadRwlock lock;
99
#else
wafwerar's avatar
wafwerar 已提交
100
  TdThreadMutex lock;
101 102 103 104 105 106 107 108 109 110 111
#endif
};

typedef struct SCacheObjTravSup {
  SCacheObj        *pCacheObj;
  int64_t           time;
  __cache_trav_fn_t fp;
  void             *param1;
} SCacheObjTravSup;

static FORCE_INLINE void __trashcan_wr_lock(SCacheObj *pCacheObj) {
112
#if defined(LINUX)
wafwerar's avatar
wafwerar 已提交
113
  taosThreadRwlockWrlock(&pCacheObj->lock);
114
#else
wafwerar's avatar
wafwerar 已提交
115
  taosThreadMutexLock(&pCacheObj->lock);
116 117 118
#endif
}

119
static FORCE_INLINE void __trashcan_unlock(SCacheObj *pCacheObj) {
120
#if defined(LINUX)
wafwerar's avatar
wafwerar 已提交
121
  taosThreadRwlockUnlock(&pCacheObj->lock);
122
#else
wafwerar's avatar
wafwerar 已提交
123
  taosThreadMutexUnlock(&pCacheObj->lock);
124 125 126
#endif
}

127
static FORCE_INLINE int32_t __trashcan_lock_init(SCacheObj *pCacheObj) {
128
#if defined(LINUX)
wafwerar's avatar
wafwerar 已提交
129
  return taosThreadRwlockInit(&pCacheObj->lock, NULL);
130
#else
wafwerar's avatar
wafwerar 已提交
131
  return taosThreadMutexInit(&pCacheObj->lock, NULL);
132 133 134
#endif
}

135
static FORCE_INLINE void __trashcan_lock_destroy(SCacheObj *pCacheObj) {
136
#if defined(LINUX)
wafwerar's avatar
wafwerar 已提交
137
  taosThreadRwlockDestroy(&pCacheObj->lock);
138
#else
wafwerar's avatar
wafwerar 已提交
139
  taosThreadMutexDestroy(&pCacheObj->lock);
140 141 142
#endif
}

143 144 145 146 147 148 149 150 151 152
/**
 * do cleanup the taos cache
 * @param pCacheObj
 */
static void doCleanupDataCache(SCacheObj *pCacheObj);

/**
 * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
 * @param handle   Cache object handle
 */
S
cache  
Shengliang Guan 已提交
153
static void *taosCacheTimedRefresh(void *handle);
154

H
Haojun Liao 已提交
155
static void doInitRefreshThread(void) {
156 157
  pCacheArrayList = taosArrayInit(4, POINTER_BYTES);

wafwerar's avatar
wafwerar 已提交
158 159 160
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
  taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
161

wafwerar's avatar
wafwerar 已提交
162 163
  taosThreadCreate(&cacheRefreshWorker, &thattr, taosCacheTimedRefresh, NULL);
  taosThreadAttrDestroy(&thattr);
164 165
}

wafwerar's avatar
wafwerar 已提交
166 167
TdThread doRegisterCacheObj(SCacheObj *pCacheObj) {
  taosThreadOnce(&cacheThreadInit, doInitRefreshThread);
168

wafwerar's avatar
wafwerar 已提交
169
  taosThreadMutexLock(&guard);
170
  taosArrayPush(pCacheArrayList, &pCacheObj);
wafwerar's avatar
wafwerar 已提交
171
  taosThreadMutexUnlock(&guard);
172 173 174 175

  return cacheRefreshWorker;
}

H
hzcheng 已提交
176 177 178 179 180 181
/**
 * @param key      key of object for hash, usually a null-terminated string
 * @param keyLen   length of key
 * @param pData    actually data. required a consecutive memory block, no pointer is allowed
 *                 in pData. Pointer copy causes memory access error.
 * @param size     size of block
182
 * @param lifespan total survial expiredTime from now
183
 * @return         SCacheNode
H
hzcheng 已提交
184
 */
185
static SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
L
Liu Jicong 已提交
186
                                       uint64_t duration);
H
hzcheng 已提交
187 188

/**
189
 * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
H
hzcheng 已提交
190
 * It will be removed until the pNode->refCount == 0
191
 * @param pCacheObj    Cache object
H
hzcheng 已提交
192 193
 * @param pNode   Cache slot object
 */
194
static void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheNode *pNode);
195

H
hzcheng 已提交
196 197 198
/**
 * remove nodes in trash with refCount == 0 in cache
 * @param pNode
199
 * @param pCacheObj
H
hzcheng 已提交
200 201 202
 * @param force   force model, if true, remove data in trash without check refcount.
 *                may cause corruption. So, forece model only applys before cache is closed
 */
H
Haojun Liao 已提交
203
static void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force);
H
hzcheng 已提交
204 205 206

/**
 * release node
207
 * @param pCacheObj      cache object
H
Haojun Liao 已提交
208
 * @param pNode          data node
H
hzcheng 已提交
209
 */
210
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheNode *pNode) {
H
hzcheng 已提交
211
  if (pNode->signature != (uint64_t)pNode) {
S
slguan 已提交
212
    uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
H
hzcheng 已提交
213 214
    return;
  }
215

216
  atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
H
Haojun Liao 已提交
217

H
Haojun Liao 已提交
218
  uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, total num:%d size:%" PRId64 "bytes",
219
         pCacheObj->name, pNode->key, pNode->data, pNode->size, (int)pCacheObj->numOfElems - 1, pCacheObj->sizeInBytes);
H
Haojun Liao 已提交
220 221 222 223

  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pNode->data);
  }
224

wafwerar's avatar
wafwerar 已提交
225
  taosMemoryFree(pNode);
H
hzcheng 已提交
226 227
}

S
cache  
Shengliang Guan 已提交
228 229
static FORCE_INLINE STrashElem *doRemoveElemInTrashcan(SCacheObj *pCacheObj, STrashElem *pElem) {
  if (pElem->pData->signature != (uint64_t)pElem->pData) {
H
Haojun Liao 已提交
230
    uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
H
Haojun Liao 已提交
231
    return NULL;
H
Haojun Liao 已提交
232 233
  }

S
cache  
Shengliang Guan 已提交
234
  STrashElem *next = pElem->next;
H
Haojun Liao 已提交
235

H
Haojun Liao 已提交
236 237 238
  pCacheObj->numOfElemsInTrash--;
  if (pElem->prev) {
    pElem->prev->next = pElem->next;
S
cache  
Shengliang Guan 已提交
239
  } else {  // pnode is the header, update header
H
Haojun Liao 已提交
240 241 242
    pCacheObj->pTrash = pElem->next;
  }

H
Haojun Liao 已提交
243 244
  if (next) {
    next->prev = pElem->prev;
H
Haojun Liao 已提交
245
  }
H
Haojun Liao 已提交
246 247 248 249 250 251

  if (pCacheObj->numOfElemsInTrash == 0) {
    assert(pCacheObj->pTrash == NULL);
  }

  return next;
H
Haojun Liao 已提交
252 253
}

S
cache  
Shengliang Guan 已提交
254
static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj *pCacheObj, STrashElem *pElem) {
H
Haojun Liao 已提交
255 256 257 258
  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pElem->pData->data);
  }

wafwerar's avatar
wafwerar 已提交
259 260
  taosMemoryFree(pElem->pData);
  taosMemoryFree(pElem);
H
Haojun Liao 已提交
261
}
H
hzcheng 已提交
262

263 264 265 266 267 268 269 270
static void pushfrontNodeInEntryList(SCacheEntry *pEntry, SCacheNode *pNode) {
  assert(pNode != NULL && pEntry != NULL);

  pNode->pNext = pEntry->next;
  pEntry->next = pNode;
  pEntry->num += 1;
}

L
Liu Jicong 已提交
271
static void removeNodeInEntryList(SCacheEntry *pe, SCacheNode *prev, SCacheNode *pNode) {
272 273 274 275 276 277 278
  if (prev == NULL) {
    ASSERT(pe->next == pNode);
    pe->next = pNode->pNext;
  } else {
    prev->pNext = pNode->pNext;
  }

H
Haojun Liao 已提交
279
  pNode->pNext = NULL;
280 281 282
  pe->num -= 1;
}

L
Liu Jicong 已提交
283
static FORCE_INLINE SCacheEntry *doFindEntry(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
284 285 286 287 288
  uint32_t hashVal = (*pCacheObj->hashFp)(key, keyLen);
  int32_t  slot = hashVal % pCacheObj->capacity;
  return &pCacheObj->pEntryList[slot];
}

L
Liu Jicong 已提交
289 290
static FORCE_INLINE SCacheNode *doSearchInEntryList(SCacheEntry *pe, const void *key, size_t keyLen,
                                                    SCacheNode **prev) {
291 292 293 294 295 296 297 298 299 300 301 302
  SCacheNode *pNode = pe->next;
  while (pNode) {
    if ((pNode->keyLen == keyLen) && memcmp(pNode->key, key, keyLen) == 0) {
      break;
    }
    *prev = pNode;
    pNode = pNode->pNext;
  }

  return pNode;
}

L
Liu Jicong 已提交
303
static bool doRemoveExpiredFn(void *param, SCacheNode *pNode) {
H
Haojun Liao 已提交
304
  SCacheObjTravSup *ps = (SCacheObjTravSup *)param;
L
Liu Jicong 已提交
305
  SCacheObj        *pCacheObj = ps->pCacheObj;
H
Haojun Liao 已提交
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323

  if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) {
    taosCacheReleaseNode(pCacheObj, pNode);

    // this node should be remove from hash table
    return false;
  }

  if (ps->fp) {
    (ps->fp)(pNode->data, ps->param1);
  }

  // do not remove element in hash table
  return true;
}

static bool doRemoveNodeFn(void *param, SCacheNode *pNode) {
  SCacheObjTravSup *ps = (SCacheObjTravSup *)param;
L
Liu Jicong 已提交
324
  SCacheObj        *pCacheObj = ps->pCacheObj;
H
Haojun Liao 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350

  if (T_REF_VAL_GET(pNode) == 0) {
    taosCacheReleaseNode(pCacheObj, pNode);
  } else {  // do add to trashcan
    taosAddToTrashcan(pCacheObj, pNode);
  }

  // this node should be remove from hash table
  return false;
}

static FORCE_INLINE int32_t getCacheCapacity(int32_t length) {
  int32_t len = 0;
  if (length < CACHE_DEFAULT_CAPACITY) {
    len = CACHE_DEFAULT_CAPACITY;
    return len;
  } else if (length > CACHE_MAX_CAPACITY) {
    len = CACHE_MAX_CAPACITY;
    return len;
  }

  len = CACHE_DEFAULT_CAPACITY;
  while (len < length && len < CACHE_MAX_CAPACITY) {
    len = (len << 1u);
  }

L
Liu Jicong 已提交
351
  return len > CACHE_MAX_CAPACITY ? CACHE_MAX_CAPACITY : len;
H
Haojun Liao 已提交
352 353
}

L
Liu Jicong 已提交
354
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInMs, bool extendLifespan, __cache_free_fn_t fn,
S
cache  
Shengliang Guan 已提交
355 356
                         const char *cacheName) {
  const int32_t SLEEP_DURATION = 500;  // 500 ms
357

L
Liu Jicong 已提交
358
  if (refreshTimeInMs <= 0) {
H
hzcheng 已提交
359 360
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
361

wafwerar's avatar
wafwerar 已提交
362
  SCacheObj *pCacheObj = (SCacheObj *)taosMemoryCalloc(1, sizeof(SCacheObj));
363
  if (pCacheObj == NULL) {
S
slguan 已提交
364
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
365 366
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
367

H
Haojun Liao 已提交
368 369
  // TODO add the auto extend procedure
  pCacheObj->capacity = 4096;
wafwerar's avatar
wafwerar 已提交
370
  pCacheObj->pEntryList = taosMemoryCalloc(pCacheObj->capacity, sizeof(SCacheEntry));
371
  if (pCacheObj->pEntryList == NULL) {
wafwerar's avatar
wafwerar 已提交
372
    taosMemoryFree(pCacheObj);
S
slguan 已提交
373
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
374 375
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
376

H
Haojun Liao 已提交
377
  // set free cache node callback function
L
Liu Jicong 已提交
378 379 380 381
  pCacheObj->hashFp = taosGetDefaultHashFunction(keyType);
  pCacheObj->freeFp = fn;
  pCacheObj->refreshTime = refreshTimeInMs;
  pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION;
382
  pCacheObj->extendLifespan = extendLifespan;  // the TTL after the last access
383

384
  if (__trashcan_lock_init(pCacheObj) != 0) {
wafwerar's avatar
wafwerar 已提交
385 386
    taosMemoryFreeClear(pCacheObj->pEntryList);
    taosMemoryFree(pCacheObj);
S
cache  
Shengliang Guan 已提交
387

S
slguan 已提交
388
    uError("failed to init lock, reason:%s", strerror(errno));
H
hzcheng 已提交
389 390
    return NULL;
  }
391

392
  pCacheObj->name = strdup(cacheName);
393
  doRegisterCacheObj(pCacheObj);
394
  return pCacheObj;
395
}
H
hzcheng 已提交
396

S
cache  
Shengliang Guan 已提交
397 398
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize,
                   int32_t durationMS) {
399
  if (pCacheObj == NULL || pCacheObj->pEntryList == NULL || pCacheObj->deleting == 1) {
400 401
    return NULL;
  }
H
Haojun Liao 已提交
402

403
  SCacheNode *pNode1 = taosCreateCacheNode(key, keyLen, pData, dataSize, durationMS);
H
Haojun Liao 已提交
404 405 406 407 408
  if (pNode1 == NULL) {
    uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
    return NULL;
  }

H
Haojun Liao 已提交
409
  T_REF_INC(pNode1);
H
Haojun Liao 已提交
410

411
  SCacheEntry *pe = doFindEntry(pCacheObj, key, keyLen);
412

413
  taosWLockLatch(&pe->latch);
414

L
Liu Jicong 已提交
415 416
  SCacheNode *prev = NULL;
  SCacheNode *pNode = doSearchInEntryList(pe, key, keyLen, &prev);
H
Haojun Liao 已提交
417

418 419
  if (pNode == NULL) {
    pushfrontNodeInEntryList(pe, pNode1);
wafwerar's avatar
wafwerar 已提交
420 421
    atomic_add_fetch_ptr(&pCacheObj->numOfElems, 1);
    atomic_add_fetch_ptr(&pCacheObj->sizeInBytes, pNode1->size);
422 423 424 425 426 427 428
    uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
           ", totalNum:%d sizeInBytes:%" PRId64 "bytes size:%" PRId64 "bytes",
           pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime, (int32_t)pCacheObj->numOfElems,
           pCacheObj->sizeInBytes, (int64_t)dataSize);
  } else {  // duplicated key exists
    // move current node to trashcan
    removeNodeInEntryList(pe, prev, pNode);
H
Haojun Liao 已提交
429

430 431 432
    if (T_REF_VAL_GET(pNode) == 0) {
      if (pCacheObj->freeFp) {
        pCacheObj->freeFp(pNode->data);
H
Haojun Liao 已提交
433
      }
434 435

      atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
wafwerar's avatar
wafwerar 已提交
436
      taosMemoryFreeClear(pNode);
437 438 439
    } else {
      taosAddToTrashcan(pCacheObj, pNode);
      uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, pNode->data);
H
Haojun Liao 已提交
440
    }
441 442 443 444 445 446 447

    pushfrontNodeInEntryList(pe, pNode1);
    atomic_add_fetch_64(&pCacheObj->sizeInBytes, pNode1->size);
    uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
           ", totalNum:%d sizeInBytes:%" PRId64 "bytes size:%" PRId64 "bytes",
           pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime, (int32_t)pCacheObj->numOfElems,
           pCacheObj->sizeInBytes, (int64_t)dataSize);
H
Haojun Liao 已提交
448 449
  }

450
  taosWUnLockLatch(&pe->latch);
H
Haojun Liao 已提交
451
  return pNode1->data;
H
hzcheng 已提交
452 453
}

H
Haojun Liao 已提交
454
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
455 456 457 458
  if (pCacheObj == NULL || pCacheObj->deleting == 1) {
    return NULL;
  }

459
  if (pCacheObj->numOfElems == 0) {
wafwerar's avatar
wafwerar 已提交
460
    atomic_add_fetch_64(&pCacheObj->statistics.missCount, 1);
461
    return NULL;
462
  }
H
Haojun Liao 已提交
463

L
Liu Jicong 已提交
464
  SCacheNode  *prev = NULL;
465 466 467
  SCacheEntry *pe = doFindEntry(pCacheObj, key, keyLen);

  taosRLockLatch(&pe->latch);
S
Shengliang Guan 已提交
468

L
Liu Jicong 已提交
469
  SCacheNode *pNode = doSearchInEntryList(pe, key, keyLen, &prev);
470 471 472 473 474 475
  if (pNode != NULL) {
    int32_t ref = T_REF_INC(pNode);
    ASSERT(ref > 0);
  }

  taosRUnLockLatch(&pe->latch);
S
Shengliang Guan 已提交
476

477
  void *pData = (pNode != NULL) ? pNode->data : NULL;
S
Shengliang Guan 已提交
478
  if (pData != NULL) {
wafwerar's avatar
wafwerar 已提交
479
    atomic_add_fetch_64(&pCacheObj->statistics.hitCount, 1);
S
cache  
Shengliang Guan 已提交
480
    uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData,
481
           T_REF_VAL_GET(pNode));
482
  } else {
wafwerar's avatar
wafwerar 已提交
483
    atomic_add_fetch_64(&pCacheObj->statistics.missCount, 1);
H
Haojun Liao 已提交
484
    uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
485
  }
S
Shengliang Guan 已提交
486

wafwerar's avatar
wafwerar 已提交
487
  atomic_add_fetch_64(&pCacheObj->statistics.totalAccess, 1);
S
Shengliang Guan 已提交
488
  return pData;
489 490
}

491 492
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
  if (pCacheObj == NULL || data == NULL) return NULL;
S
cache  
Shengliang Guan 已提交
493

494
  SCacheNode *ptNode = (SCacheNode *)((char *)data - sizeof(SCacheNode));
495
  if (ptNode->signature != (uint64_t)ptNode) {
H
Haojun Liao 已提交
496
    uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
497 498
    return NULL;
  }
499

500
  int32_t ref = T_REF_INC(ptNode);
H
Haojun Liao 已提交
501
  uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
H
Haojun Liao 已提交
502

503 504 505 506 507
  // the data if referenced by at least one object, so the reference count must be greater than the value of 2.
  assert(ref >= 2);
  return data;
}

508
void *taosCacheTransferData(SCacheObj *pCacheObj, void **data) {
H
Haojun Liao 已提交
509
  if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
S
cache  
Shengliang Guan 已提交
510

511
  SCacheNode *ptNode = (SCacheNode *)((char *)(*data) - sizeof(SCacheNode));
512
  if (ptNode->signature != (uint64_t)ptNode) {
H
Haojun Liao 已提交
513
    uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
514 515
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
516

517 518
  assert(T_REF_VAL_GET(ptNode) >= 1);
  char *d = *data;
S
cache  
Shengliang Guan 已提交
519

520 521 522
  // clear its reference to old area
  *data = NULL;
  return d;
H
hzcheng 已提交
523
}
524

525
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
H
Haojun Liao 已提交
526
  if (pCacheObj == NULL) {
H
Haojun Liao 已提交
527 528 529
    return;
  }

H
Haojun Liao 已提交
530
  if ((*data) == NULL) {
H
Haojun Liao 已提交
531
    uError("cache:%s, NULL data to release", pCacheObj->name);
532 533
    return;
  }
H
Haojun Liao 已提交
534 535 536 537

  // The operation of removal from hash table and addition to trashcan is not an atomic operation,
  // therefore the check for the empty of both the hash table and the trashcan has a race condition.
  // It happens when there is only one object in the cache, and two threads which has referenced this object
H
Haojun Liao 已提交
538
  // start to free the it simultaneously [TD-1569].
539
  SCacheNode *pNode = (SCacheNode *)((char *)(*data) - sizeof(SCacheNode));
540
  if (pNode->signature != (uint64_t)pNode) {
H
Haojun Liao 已提交
541
    uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode);
542 543
    return;
  }
544

545
  *data = NULL;
546

547
  // note: extend lifespan before dec ref count
H
Haojun Liao 已提交
548
  bool inTrashcan = pNode->inTrashcan;
H
Haojun Liao 已提交
549

H
Haojun Liao 已提交
550
  if (pCacheObj->extendLifespan && (!inTrashcan) && (!_remove)) {
551
    atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
S
cache  
Shengliang Guan 已提交
552
    uDebug("cache:%s, data:%p extend expire time: %" PRId64, pCacheObj->name, pNode->data, pNode->expireTime);
553
  }
H
Haojun Liao 已提交
554

H
Haojun Liao 已提交
555 556
  if (_remove) {
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
S
cache  
Shengliang Guan 已提交
557 558
    char *key = pNode->key;
    char *d = pNode->data;
H
Haojun Liao 已提交
559

560
    int32_t ref = T_REF_VAL_GET(pNode);
H
Haojun Liao 已提交
561
    uDebug("cache:%s, key:%p, %p is released, refcnt:%d, in trashcan:%d", pCacheObj->name, key, d, ref - 1, inTrashcan);
H
Haojun Liao 已提交
562 563

    /*
S
cache  
Shengliang Guan 已提交
564 565
     * If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all
     * users releasing this resources.
H
Haojun Liao 已提交
566 567 568 569
     *
     * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
     * that tries to do the same thing.
     */
H
Haojun Liao 已提交
570
    if (inTrashcan) {
H
Haojun Liao 已提交
571
      ref = T_REF_VAL_GET(pNode);
572

H
Haojun Liao 已提交
573 574 575
      if (ref == 1) {
        // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may be
        // destroyed by refresh worker if decrease ref count before removing it from linked-list.
H
Haojun Liao 已提交
576
        assert(pNode->pTNodeHeader->pData == pNode);
H
Haojun Liao 已提交
577

578
        __trashcan_wr_lock(pCacheObj);
H
Haojun Liao 已提交
579
        doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
580
        __trashcan_unlock(pCacheObj);
H
Haojun Liao 已提交
581

H
Haojun Liao 已提交
582 583 584
        ref = T_REF_DEC(pNode);
        assert(ref == 0);

H
Haojun Liao 已提交
585
        doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
H
Haojun Liao 已提交
586 587
      } else {
        ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
588
        assert(ref >= 0);
H
Haojun Liao 已提交
589 590
      }
    } else {
591 592
      // NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
      // when reaches here.
L
Liu Jicong 已提交
593
      SCacheNode  *prev = NULL;
594 595 596
      SCacheEntry *pe = doFindEntry(pCacheObj, pNode->key, pNode->keyLen);

      taosWLockLatch(&pe->latch);
597
      ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
598

599 600 601 602 603
      SCacheNode *p = doSearchInEntryList(pe, pNode->key, pNode->keyLen, &prev);

      if (p != NULL) {
        // successfully remove from hash table, if failed, this node must have been move to trash already, do nothing.
        // note that the remove operation can be executed only once.
H
Haojun Liao 已提交
604
        if (p != pNode) {
S
cache  
Shengliang Guan 已提交
605
          uDebug(
606 607 608
              "cache:%s, key:%p, a new entry:%p found, refcnt:%d, prev entry:%p, refcnt:%d has been removed by "
              "others already, prev must in trashcan",
              pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data, T_REF_VAL_GET(pNode));
H
Haojun Liao 已提交
609

610
          assert(p->pTNodeHeader == NULL && pNode->pTNodeHeader != NULL);
H
Haojun Liao 已提交
611
        } else {
612
          removeNodeInEntryList(pe, prev, p);
H
Haojun Liao 已提交
613
          uDebug("cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d", pCacheObj->name, pNode->key,
H
Haojun Liao 已提交
614 615 616
                 pNode->data, ref);
          if (ref > 0) {
            assert(pNode->pTNodeHeader == NULL);
H
Haojun Liao 已提交
617
            taosAddToTrashcan(pCacheObj, pNode);
H
Haojun Liao 已提交
618
          } else {  // ref == 0
619
            atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
H
Haojun Liao 已提交
620

621
            int32_t size = (int32_t)pCacheObj->numOfElems;
H
Haojun Liao 已提交
622
            uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes",
623
                   pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->sizeInBytes);
H
Haojun Liao 已提交
624

H
Haojun Liao 已提交
625 626 627
            if (pCacheObj->freeFp) {
              pCacheObj->freeFp(pNode->data);
            }
H
Haojun Liao 已提交
628

wafwerar's avatar
wafwerar 已提交
629
            taosMemoryFree(pNode);
H
Haojun Liao 已提交
630
          }
H
Haojun Liao 已提交
631
        }
632 633

        taosWUnLockLatch(&pe->latch);
634
      } else {
S
cache  
Shengliang Guan 已提交
635 636
        uDebug("cache:%s, key:%p, %p has been removed from hash table by others already, refcnt:%d", pCacheObj->name,
               pNode->key, pNode->data, ref);
H
Haojun Liao 已提交
637
      }
H
Haojun Liao 已提交
638 639
    }

640
  } else {
H
Haojun Liao 已提交
641
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
S
cache  
Shengliang Guan 已提交
642 643 644
    char *key = pNode->key;
    char *p = pNode->data;

H
Haojun Liao 已提交
645
    int32_t ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
646
    uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trashcan:%d", pCacheObj->name, key, p, ref, inTrashcan);
H
Haojun Liao 已提交
647 648
  }
}
H
Haojun Liao 已提交
649

L
Liu Jicong 已提交
650
void doTraverseElems(SCacheObj *pCacheObj, bool (*fp)(void *param, SCacheNode *pNode), SCacheObjTravSup *pSup) {
651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669
  int32_t numOfEntries = (int32_t)pCacheObj->capacity;
  for (int32_t i = 0; i < numOfEntries; ++i) {
    SCacheEntry *pEntry = &pCacheObj->pEntryList[i];
    if (pEntry->num == 0) {
      continue;
    }

    taosWLockLatch(&pEntry->latch);

    SCacheNode *pNode = pEntry->next;
    while (pNode != NULL) {
      SCacheNode *next = pNode->pNext;

      if (fp(pSup, pNode)) {
        pNode = pNode->pNext;
      } else {
        pEntry->next = next;
        pEntry->num -= 1;

wafwerar's avatar
wafwerar 已提交
670
        atomic_sub_fetch_ptr(&pCacheObj->numOfElems, 1);
671 672 673 674 675 676 677
        pNode = next;
      }
    }

    taosWUnLockLatch(&pEntry->latch);
  }
}
H
Haojun Liao 已提交
678

L
Liu Jicong 已提交
679
void taosCacheEmpty(SCacheObj *pCacheObj) {
680 681
  SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
  doTraverseElems(pCacheObj, doRemoveNodeFn, &sup);
H
Haojun Liao 已提交
682
  taosTrashcanEmpty(pCacheObj, false);
683 684 685 686 687 688
}

void taosCacheCleanup(SCacheObj *pCacheObj) {
  if (pCacheObj == NULL) {
    return;
  }
689 690

  pCacheObj->deleting = 1;
691 692

  // wait for the refresh thread quit before destroying the cache object.
693
  // But in the dll, the child thread will be killed before atexit takes effect.
S
cache  
Shengliang Guan 已提交
694 695 696
  while (atomic_load_8(&pCacheObj->deleting) != 0) {
    if (refreshWorkerNormalStopped) break;
    if (refreshWorkerUnexpectedStopped) return;
697
    taosMsleep(50);
S
TD-2805  
Shengliang Guan 已提交
698
  }
699

H
Haojun Liao 已提交
700
  uInfo("cache:%s will be cleaned up", pCacheObj->name);
701 702 703
  doCleanupDataCache(pCacheObj);
}

704 705
SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration) {
  size_t sizeInBytes = size + sizeof(SCacheNode) + keyLen;
706

wafwerar's avatar
wafwerar 已提交
707
  SCacheNode *pNewNode = taosMemoryCalloc(1, sizeInBytes);
708
  if (pNewNode == NULL) {
709
    terrno = TSDB_CODE_OUT_OF_MEMORY;
710 711 712 713
    uError("failed to allocate memory, reason:%s", strerror(errno));
    return NULL;
  }

L
Liu Jicong 已提交
714
  pNewNode->data = (char *)pNewNode + sizeof(SCacheNode);
715
  pNewNode->dataLen = size;
716 717
  memcpy(pNewNode->data, pData, size);

L
Liu Jicong 已提交
718
  pNewNode->key = (char *)pNewNode + sizeof(SCacheNode) + size;
719
  pNewNode->keyLen = (uint16_t)keyLen;
720 721 722

  memcpy(pNewNode->key, key, keyLen);

L
Liu Jicong 已提交
723 724
  pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
  pNewNode->lifespan = duration;
S
cache  
Shengliang Guan 已提交
725
  pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan;
L
Liu Jicong 已提交
726 727
  pNewNode->signature = (uint64_t)pNewNode;
  pNewNode->size = (uint32_t)sizeInBytes;
728 729 730 731

  return pNewNode;
}

732
void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheNode *pNode) {
H
Haojun Liao 已提交
733
  if (pNode->inTrashcan) { /* node is already in trash */
734
    assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
735 736 737
    return;
  }

738
  __trashcan_wr_lock(pCacheObj);
wafwerar's avatar
wafwerar 已提交
739
  STrashElem *pElem = taosMemoryCalloc(1, sizeof(STrashElem));
740
  pElem->pData = pNode;
741 742
  pElem->prev = NULL;
  pElem->next = NULL;
H
Haojun Liao 已提交
743
  pNode->inTrashcan = true;
744
  pNode->pTNodeHeader = pElem;
H
Haojun Liao 已提交
745

746 747 748 749 750 751 752
  pElem->next = pCacheObj->pTrash;
  if (pCacheObj->pTrash) {
    pCacheObj->pTrash->prev = pElem;
  }

  pCacheObj->pTrash = pElem;
  pCacheObj->numOfElemsInTrash++;
753
  __trashcan_unlock(pCacheObj);
754

755 756
  uDebug("cache:%s key:%p, %p move to trashcan, pTrashElem:%p, numOfElem in trashcan:%d", pCacheObj->name, pNode->key,
         pNode->data, pElem, pCacheObj->numOfElemsInTrash);
757 758
}

H
Haojun Liao 已提交
759
void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
760
  __trashcan_wr_lock(pCacheObj);
761 762 763

  if (pCacheObj->numOfElemsInTrash == 0) {
    if (pCacheObj->pTrash != NULL) {
H
Haojun Liao 已提交
764
      pCacheObj->pTrash = NULL;
S
cache  
Shengliang Guan 已提交
765 766
      uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name,
             pCacheObj->numOfElemsInTrash);
767 768
    }

769
    __trashcan_unlock(pCacheObj);
770
    return;
H
hjxilinx 已提交
771
  }
772

S
cache  
Shengliang Guan 已提交
773
  const char *stat[] = {"false", "true"};
H
Haojun Liao 已提交
774
  uDebug("cache:%s start to cleanup trashcan, numOfElem in trashcan:%d, free:%s", pCacheObj->name,
S
cache  
Shengliang Guan 已提交
775
         pCacheObj->numOfElemsInTrash, (force ? stat[1] : stat[0]));
776

H
Haojun Liao 已提交
777
  STrashElem *pElem = pCacheObj->pTrash;
778 779
  while (pElem) {
    T_REF_VAL_CHECK(pElem->pData);
H
Haojun Liao 已提交
780
    assert(pElem->next != pElem && pElem->prev != pElem);
781 782

    if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
S
cache  
Shengliang Guan 已提交
783 784
      uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key,
             pElem->pData->data, pCacheObj->numOfElemsInTrash - 1);
785

H
Haojun Liao 已提交
786 787 788
      doRemoveElemInTrashcan(pCacheObj, pElem);
      doDestroyTrashcanElem(pCacheObj, pElem);
      pElem = pCacheObj->pTrash;
789 790 791 792 793
    } else {
      pElem = pElem->next;
    }
  }

794
  __trashcan_unlock(pCacheObj);
795 796 797
}

void doCleanupDataCache(SCacheObj *pCacheObj) {
798 799
  SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
  doTraverseElems(pCacheObj, doRemoveNodeFn, &sup);
800

H
Haojun Liao 已提交
801
  // todo memory leak if there are object with refcount greater than 0 in hash table?
802
  taosTrashcanEmpty(pCacheObj, true);
H
Haojun Liao 已提交
803

804
  __trashcan_lock_destroy(pCacheObj);
S
cache  
Shengliang Guan 已提交
805

wafwerar's avatar
wafwerar 已提交
806 807 808
  taosMemoryFreeClear(pCacheObj->pEntryList);
  taosMemoryFreeClear(pCacheObj->name);
  taosMemoryFree(pCacheObj);
809
}
810

S
cache  
Shengliang Guan 已提交
811
static void doCacheRefresh(SCacheObj *pCacheObj, int64_t time, __cache_trav_fn_t fp, void *param1) {
H
Haojun Liao 已提交
812
  assert(pCacheObj != NULL);
813

814 815
  SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1};
  doTraverseElems(pCacheObj, doRemoveExpiredFn, &sup);
816 817
}

818
void taosCacheRefreshWorkerUnexpectedStopped(void) {
S
cache  
Shengliang Guan 已提交
819 820
  if (!refreshWorkerNormalStopped) {
    refreshWorkerUnexpectedStopped = true;
821 822 823
  }
}

S
cache  
Shengliang Guan 已提交
824
void *taosCacheTimedRefresh(void *handle) {
825 826
  assert(pCacheArrayList != NULL);
  uDebug("cache refresh thread starts");
827

H
Haojun Liao 已提交
828
  setThreadName("cacheRefresh");
829

S
cache  
Shengliang Guan 已提交
830 831
  const int32_t SLEEP_DURATION = 500;  // 500 ms
  int64_t       count = 0;
wafwerar's avatar
wafwerar 已提交
832 833 834 835 836
#ifdef WINDOWS
  if (taosCheckCurrentInDll()) {
    atexit(taosCacheRefreshWorkerUnexpectedStopped);
  }
#endif
837

S
cache  
Shengliang Guan 已提交
838
  while (1) {
839
    taosMsleep(SLEEP_DURATION);
840 841 842
    if (stopRefreshWorker) {
      goto _end;
    }
H
Haojun Liao 已提交
843

wafwerar's avatar
wafwerar 已提交
844
    taosThreadMutexLock(&guard);
845
    size_t size = taosArrayGetSize(pCacheArrayList);
wafwerar's avatar
wafwerar 已提交
846
    taosThreadMutexUnlock(&guard);
847

848
    count += 1;
849

S
cache  
Shengliang Guan 已提交
850
    for (int32_t i = 0; i < size; ++i) {
wafwerar's avatar
wafwerar 已提交
851
      taosThreadMutexLock(&guard);
S
cache  
Shengliang Guan 已提交
852
      SCacheObj *pCacheObj = taosArrayGetP(pCacheArrayList, i);
853

854
      if (pCacheObj == NULL) {
855
        uError("object is destroyed. ignore and try next");
wafwerar's avatar
wafwerar 已提交
856
        taosThreadMutexUnlock(&guard);
857
        continue;
858
      }
859

860 861
      // check if current cache object will be deleted every 500ms.
      if (pCacheObj->deleting) {
862 863 864
        taosArrayRemove(pCacheArrayList, i);
        size = taosArrayGetSize(pCacheArrayList);

S
cache  
Shengliang Guan 已提交
865 866
        uDebug("%s is destroying, remove it from refresh list, remain cache obj:%" PRIzu, pCacheObj->name, size);
        pCacheObj->deleting = 0;  // reset the deleting flag to enable pCacheObj to continue releasing resources.
867

wafwerar's avatar
wafwerar 已提交
868
        taosThreadMutexUnlock(&guard);
869
        continue;
870
      }
871

wafwerar's avatar
wafwerar 已提交
872
      taosThreadMutexUnlock(&guard);
873

874 875 876 877
      if ((count % pCacheObj->checkTick) != 0) {
        continue;
      }

878
      size_t elemInHash = pCacheObj->numOfElems;
879 880 881 882 883 884 885 886 887 888
      if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
        continue;
      }

      uDebug("%s refresh thread scan", pCacheObj->name);
      pCacheObj->statistics.refreshCount++;

      // refresh data in hash table
      if (elemInHash > 0) {
        int64_t now = taosGetTimestampMs();
A
AlexDuan 已提交
889
        doCacheRefresh(pCacheObj, now, NULL, NULL);
890 891 892 893
      }

      taosTrashcanEmpty(pCacheObj, false);
    }
894 895
  }

S
cache  
Shengliang Guan 已提交
896
_end:
897
  taosArrayDestroy(pCacheArrayList);
H
Haojun Liao 已提交
898 899

  pCacheArrayList = NULL;
wafwerar's avatar
wafwerar 已提交
900
  taosThreadMutexDestroy(&guard);
S
cache  
Shengliang Guan 已提交
901
  refreshWorkerNormalStopped = true;
H
Haojun Liao 已提交
902

903
  uDebug("cache refresh thread quits");
904
  return NULL;
dengyihao's avatar
dengyihao 已提交
905
}
906

S
cache  
Shengliang Guan 已提交
907
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1) {
908 909 910 911 912
  if (pCacheObj == NULL) {
    return;
  }

  int64_t now = taosGetTimestampMs();
A
AlexDuan 已提交
913
  doCacheRefresh(pCacheObj, now, fp, param1);
914
}
915

H
Haojun Liao 已提交
916 917
void taosStopCacheRefreshWorker(void) {
  stopRefreshWorker = true;
wafwerar's avatar
wafwerar 已提交
918 919
  TdThreadOnce tmp = PTHREAD_ONCE_INIT;
  if (memcmp(&cacheRefreshWorker, &tmp, sizeof(TdThreadOnce)) != 0) taosThreadJoin(cacheRefreshWorker, NULL);
920
  taosArrayDestroy(pCacheArrayList);
H
Haojun Liao 已提交
921 922
}

L
Liu Jicong 已提交
923
size_t taosCacheGetNumOfObj(const SCacheObj *pCacheObj) { return pCacheObj->numOfElems + pCacheObj->numOfElemsInTrash; }
H
Haojun Liao 已提交
924

L
Liu Jicong 已提交
925
SCacheIter *taosCacheCreateIter(const SCacheObj *pCacheObj) {
H
Haojun Liao 已提交
926
  ASSERT(pCacheObj != NULL);
L
Liu Jicong 已提交
927 928
  SCacheIter *pIter = taosMemoryCalloc(1, sizeof(SCacheIter));
  pIter->pCacheObj = (SCacheObj *)pCacheObj;
H
Haojun Liao 已提交
929
  pIter->entryIndex = -1;
L
Liu Jicong 已提交
930
  pIter->index = -1;
H
Haojun Liao 已提交
931 932 933
  return pIter;
}

L
Liu Jicong 已提交
934 935
bool taosCacheIterNext(SCacheIter *pIter) {
  SCacheObj *pCacheObj = pIter->pCacheObj;
H
Haojun Liao 已提交
936 937 938

  if (pIter->index + 1 >= pIter->numOfObj) {
    // release the reference for all objects in the snapshot
L
Liu Jicong 已提交
939 940 941
    for (int32_t i = 0; i < pIter->numOfObj; ++i) {
      char *p = pIter->pCurrent[i]->data;
      taosCacheRelease(pCacheObj, (void **)&p, false);
H
Haojun Liao 已提交
942
      pIter->pCurrent[i] = NULL;
943 944 945 946
    }   

    if (pIter->entryIndex + 1 >= pCacheObj->capacity) {
      return false;
H
Haojun Liao 已提交
947 948
    }

949 950 951 952 953 954 955
    while (1) {
      pIter->entryIndex++;
      if (pIter->entryIndex >= pCacheObj->capacity) {
        return false;
      }

      SCacheEntry *pEntry = &pCacheObj->pEntryList[pIter->entryIndex];
H
Haojun Liao 已提交
956 957 958 959 960 961 962 963
      taosRLockLatch(&pEntry->latch);

      if (pEntry->num == 0) {
        taosRUnLockLatch(&pEntry->latch);
        continue;
      }

      if (pIter->numOfObj < pEntry->num) {
wafwerar's avatar
wafwerar 已提交
964
        char *tmp = taosMemoryRealloc(pIter->pCurrent, pEntry->num * POINTER_BYTES);
H
Haojun Liao 已提交
965 966 967 968 969 970 971 972 973
        if (tmp == NULL) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          taosRUnLockLatch(&pEntry->latch);
          return false;
        }

        pIter->pCurrent = (SCacheNode **)tmp;
      }

L
Liu Jicong 已提交
974
      SCacheNode *pNode = pEntry->next;
H
Haojun Liao 已提交
975 976 977 978 979 980 981 982 983 984 985 986 987
      for (int32_t i = 0; i < pEntry->num; ++i) {
        ASSERT(pNode != NULL);

        pIter->pCurrent[i] = pNode;
        int32_t ref = T_REF_INC(pIter->pCurrent[i]);
        ASSERT(ref >= 1);

        pNode = pNode->pNext;
      }

      pIter->numOfObj = pEntry->num;
      taosRUnLockLatch(&pEntry->latch);

L
Liu Jicong 已提交
988
      pIter->index = -1;
H
Haojun Liao 已提交
989 990 991 992 993 994 995 996
      break;
    }
  }

  pIter->index += 1;
  return true;
}

L
Liu Jicong 已提交
997 998
void *taosCacheIterGetData(const SCacheIter *pIter, size_t *len) {
  SCacheNode *pNode = pIter->pCurrent[pIter->index];
H
Haojun Liao 已提交
999 1000 1001 1002
  *len = pNode->dataLen;
  return pNode->data;
}

L
Liu Jicong 已提交
1003 1004
void *taosCacheIterGetKey(const SCacheIter *pIter, size_t *len) {
  SCacheNode *pNode = pIter->pCurrent[pIter->index];
H
Haojun Liao 已提交
1005 1006 1007 1008
  *len = pNode->keyLen;
  return pNode->key;
}

L
Liu Jicong 已提交
1009
void taosCacheDestroyIter(SCacheIter *pIter) {
wafwerar's avatar
wafwerar 已提交
1010 1011
  taosMemoryFreeClear(pIter->pCurrent);
  taosMemoryFreeClear(pIter);
L
Liu Jicong 已提交
1012
}