tcache.c 30.8 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
S
cache  
Shengliang Guan 已提交
17
#include "tcache.h"
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
H
hzcheng 已提交
20 21
#include "tutil.h"

H
Haojun Liao 已提交
22 23 24
#define CACHE_MAX_CAPACITY 1024*1024*16
#define CACHE_DEFAULT_CAPACITY 1024*4

25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
static pthread_t       cacheRefreshWorker = {0};
static pthread_once_t  cacheThreadInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER;
static SArray         *pCacheArrayList = NULL;
static bool            stopRefreshWorker = false;
static bool            refreshWorkerNormalStopped = false;
static bool            refreshWorkerUnexpectedStopped = false;

typedef struct SCacheNode {
  uint64_t           addedTime;   // the added time when this element is added or updated into cache
  uint64_t           lifespan;    // life duration when this element should be remove from cache
  int64_t            expireTime;  // expire time
  uint64_t           signature;
  struct STrashElem *pTNodeHeader;    // point to trash node head
  uint16_t           keyLen: 15;    // max key size: 32kb
  bool               inTrashcan : 1;  // denote if it is in trash or not
  uint32_t           size;            // allocated size for current SCacheNode
  uint32_t           dataLen;
  T_REF_DECLARE()
  struct SCacheNode *pNext;
  char              *key;
  char              *data;
} SCacheNode;

typedef struct SCacheEntry {
  int32_t     num;      // number of elements in current entry
  SRWLatch    latch;    // entry latch
  SCacheNode *next;
} SCacheEntry;

typedef struct STrashElem {
  struct STrashElem *prev;
  struct STrashElem *next;
  SCacheNode        *pData;
} STrashElem;

H
Haojun Liao 已提交
61 62 63 64 65 66 67 68
typedef struct SCacheIter {
  SCacheObj   *pCacheObj;
  SCacheNode **pCurrent;
  int32_t      entryIndex;
  int32_t      index;
  int32_t      numOfObj;
} SCacheIter;

69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
/*
 * to accommodate the old data which has the same key value of new one in hashList
 * when an new node is put into cache, if an existed one with the same key:
 * 1. if the old one does not be referenced, update it.
 * 2. otherwise, move the old one to pTrash, addedTime the new one.
 *
 * when the node in pTrash does not be referenced, it will be release at the expired expiredTime
 */
struct SCacheObj {
  int64_t            sizeInBytes;  // total allocated buffer in this hash table, SCacheObj is not included.
  int64_t            refreshTime;
  char              *name;
  SCacheStatis       statistics;

  SCacheEntry       *pEntryList;
  size_t             capacity;     // number of slots
  size_t             numOfElems;         // number of elements in cache
  _hash_fn_t         hashFp;       // hash function
  __cache_free_fn_t  freeFp;

  uint32_t           numOfElemsInTrash;  // number of element in trash
  STrashElem        *pTrash;

  uint8_t            deleting;           // set the deleting flag to stop refreshing ASAP.
  pthread_t          refreshWorker;
  bool               extendLifespan;  // auto extend life span when one item is accessed.
  int64_t            checkTick;       // tick used to record the check times of the refresh threads
#if defined(LINUX)
  pthread_rwlock_t lock;
#else
  pthread_mutex_t lock;
#endif
};

typedef struct SCacheObjTravSup {
  SCacheObj        *pCacheObj;
  int64_t           time;
  __cache_trav_fn_t fp;
  void             *param1;
} SCacheObjTravSup;

static FORCE_INLINE void __trashcan_wr_lock(SCacheObj *pCacheObj) {
111
#if defined(LINUX)
112
  pthread_rwlock_wrlock(&pCacheObj->lock);
113
#else
114
  pthread_mutex_lock(&pCacheObj->lock);
115 116 117
#endif
}

118
static FORCE_INLINE void __trashcan_unlock(SCacheObj *pCacheObj) {
119
#if defined(LINUX)
120
  pthread_rwlock_unlock(&pCacheObj->lock);
121
#else
122
  pthread_mutex_unlock(&pCacheObj->lock);
123 124 125
#endif
}

126
static FORCE_INLINE int32_t __trashcan_lock_init(SCacheObj *pCacheObj) {
127
#if defined(LINUX)
128
  return pthread_rwlock_init(&pCacheObj->lock, NULL);
129
#else
130
  return pthread_mutex_init(&pCacheObj->lock, NULL);
131 132 133
#endif
}

134
static FORCE_INLINE void __trashcan_lock_destroy(SCacheObj *pCacheObj) {
135
#if defined(LINUX)
136
  pthread_rwlock_destroy(&pCacheObj->lock);
137
#else
138
  pthread_mutex_destroy(&pCacheObj->lock);
139 140 141
#endif
}

142 143 144 145 146 147 148 149 150 151
/**
 * do cleanup the taos cache
 * @param pCacheObj
 */
static void doCleanupDataCache(SCacheObj *pCacheObj);

/**
 * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
 * @param handle   Cache object handle
 */
S
cache  
Shengliang Guan 已提交
152
static void *taosCacheTimedRefresh(void *handle);
153

H
Haojun Liao 已提交
154
static void doInitRefreshThread(void) {
155 156 157 158 159 160 161 162 163 164
  pCacheArrayList = taosArrayInit(4, POINTER_BYTES);

  pthread_attr_t thattr;
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

  pthread_create(&cacheRefreshWorker, &thattr, taosCacheTimedRefresh, NULL);
  pthread_attr_destroy(&thattr);
}

S
cache  
Shengliang Guan 已提交
165
pthread_t doRegisterCacheObj(SCacheObj *pCacheObj) {
166 167 168 169 170 171 172 173 174
  pthread_once(&cacheThreadInit, doInitRefreshThread);

  pthread_mutex_lock(&guard);
  taosArrayPush(pCacheArrayList, &pCacheObj);
  pthread_mutex_unlock(&guard);

  return cacheRefreshWorker;
}

H
hzcheng 已提交
175 176 177 178 179 180
/**
 * @param key      key of object for hash, usually a null-terminated string
 * @param keyLen   length of key
 * @param pData    actually data. required a consecutive memory block, no pointer is allowed
 *                 in pData. Pointer copy causes memory access error.
 * @param size     size of block
181
 * @param lifespan total survial expiredTime from now
182
 * @return         SCacheNode
H
hzcheng 已提交
183
 */
184
static SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
S
cache  
Shengliang Guan 已提交
185
                                           uint64_t duration);
H
hzcheng 已提交
186 187

/**
188
 * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
H
hzcheng 已提交
189
 * It will be removed until the pNode->refCount == 0
190
 * @param pCacheObj    Cache object
H
hzcheng 已提交
191 192
 * @param pNode   Cache slot object
 */
193
static void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheNode *pNode);
194

H
hzcheng 已提交
195 196 197
/**
 * remove nodes in trash with refCount == 0 in cache
 * @param pNode
198
 * @param pCacheObj
H
hzcheng 已提交
199 200 201
 * @param force   force model, if true, remove data in trash without check refcount.
 *                may cause corruption. So, forece model only applys before cache is closed
 */
H
Haojun Liao 已提交
202
static void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force);
H
hzcheng 已提交
203 204 205

/**
 * release node
206
 * @param pCacheObj      cache object
H
Haojun Liao 已提交
207
 * @param pNode          data node
H
hzcheng 已提交
208
 */
209
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheNode *pNode) {
H
hzcheng 已提交
210
  if (pNode->signature != (uint64_t)pNode) {
S
slguan 已提交
211
    uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
H
hzcheng 已提交
212 213
    return;
  }
214

215
  atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
H
Haojun Liao 已提交
216

H
Haojun Liao 已提交
217
  uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, total num:%d size:%" PRId64 "bytes",
218
         pCacheObj->name, pNode->key, pNode->data, pNode->size, (int)pCacheObj->numOfElems - 1, pCacheObj->sizeInBytes);
H
Haojun Liao 已提交
219 220 221 222

  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pNode->data);
  }
223

H
hzcheng 已提交
224 225 226
  free(pNode);
}

S
cache  
Shengliang Guan 已提交
227 228
static FORCE_INLINE STrashElem *doRemoveElemInTrashcan(SCacheObj *pCacheObj, STrashElem *pElem) {
  if (pElem->pData->signature != (uint64_t)pElem->pData) {
H
Haojun Liao 已提交
229
    uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
H
Haojun Liao 已提交
230
    return NULL;
H
Haojun Liao 已提交
231 232
  }

S
cache  
Shengliang Guan 已提交
233
  STrashElem *next = pElem->next;
H
Haojun Liao 已提交
234

H
Haojun Liao 已提交
235 236 237
  pCacheObj->numOfElemsInTrash--;
  if (pElem->prev) {
    pElem->prev->next = pElem->next;
S
cache  
Shengliang Guan 已提交
238
  } else {  // pnode is the header, update header
H
Haojun Liao 已提交
239 240 241
    pCacheObj->pTrash = pElem->next;
  }

H
Haojun Liao 已提交
242 243
  if (next) {
    next->prev = pElem->prev;
H
Haojun Liao 已提交
244
  }
H
Haojun Liao 已提交
245 246 247 248 249 250

  if (pCacheObj->numOfElemsInTrash == 0) {
    assert(pCacheObj->pTrash == NULL);
  }

  return next;
H
Haojun Liao 已提交
251 252
}

S
cache  
Shengliang Guan 已提交
253
static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj *pCacheObj, STrashElem *pElem) {
H
Haojun Liao 已提交
254 255 256 257 258 259 260
  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pElem->pData->data);
  }

  free(pElem->pData);
  free(pElem);
}
H
hzcheng 已提交
261

262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
static void pushfrontNodeInEntryList(SCacheEntry *pEntry, SCacheNode *pNode) {
  assert(pNode != NULL && pEntry != NULL);

  pNode->pNext = pEntry->next;
  pEntry->next = pNode;
  pEntry->num += 1;
}

static void removeNodeInEntryList(SCacheEntry* pe, SCacheNode* prev, SCacheNode* pNode) {
  if (prev == NULL) {
    ASSERT(pe->next == pNode);
    pe->next = pNode->pNext;
  } else {
    prev->pNext = pNode->pNext;
  }

H
Haojun Liao 已提交
278
  pNode->pNext = NULL;
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
  pe->num -= 1;
}

static FORCE_INLINE SCacheEntry* doFindEntry(SCacheObj* pCacheObj, const void* key, size_t keyLen) {
  uint32_t hashVal = (*pCacheObj->hashFp)(key, keyLen);
  int32_t  slot = hashVal % pCacheObj->capacity;
  return &pCacheObj->pEntryList[slot];
}

static FORCE_INLINE SCacheNode *
doSearchInEntryList(SCacheEntry *pe, const void *key, size_t keyLen, SCacheNode** prev) {
  SCacheNode *pNode = pe->next;
  while (pNode) {
    if ((pNode->keyLen == keyLen) && memcmp(pNode->key, key, keyLen) == 0) {
      break;
    }
    *prev = pNode;
    pNode = pNode->pNext;
  }

  return pNode;
}

H
Haojun Liao 已提交
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
static bool doRemoveExpiredFn(void *param, SCacheNode* pNode) {
  SCacheObjTravSup *ps = (SCacheObjTravSup *)param;
  SCacheObj     *pCacheObj = ps->pCacheObj;

  if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) {
    taosCacheReleaseNode(pCacheObj, pNode);

    // this node should be remove from hash table
    return false;
  }

  if (ps->fp) {
    (ps->fp)(pNode->data, ps->param1);
  }

  // do not remove element in hash table
  return true;
}

static bool doRemoveNodeFn(void *param, SCacheNode *pNode) {
  SCacheObjTravSup *ps = (SCacheObjTravSup *)param;
  SCacheObj     *pCacheObj = ps->pCacheObj;

  if (T_REF_VAL_GET(pNode) == 0) {
    taosCacheReleaseNode(pCacheObj, pNode);
  } else {  // do add to trashcan
    taosAddToTrashcan(pCacheObj, pNode);
  }

  // this node should be remove from hash table
  return false;
}

static FORCE_INLINE int32_t getCacheCapacity(int32_t length) {
  int32_t len = 0;
  if (length < CACHE_DEFAULT_CAPACITY) {
    len = CACHE_DEFAULT_CAPACITY;
    return len;
  } else if (length > CACHE_MAX_CAPACITY) {
    len = CACHE_MAX_CAPACITY;
    return len;
  }

  len = CACHE_DEFAULT_CAPACITY;
  while (len < length && len < CACHE_MAX_CAPACITY) {
    len = (len << 1u);
  }

  return len > CACHE_MAX_CAPACITY? CACHE_MAX_CAPACITY:len;
}

S
cache  
Shengliang Guan 已提交
353 354 355
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn,
                         const char *cacheName) {
  const int32_t SLEEP_DURATION = 500;  // 500 ms
356

H
Haojun Liao 已提交
357
  if (refreshTimeInSeconds <= 0) {
H
hzcheng 已提交
358 359
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
360

361 362
  SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
  if (pCacheObj == NULL) {
S
slguan 已提交
363
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
364 365
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
366

H
Haojun Liao 已提交
367 368 369
  // TODO add the auto extend procedure
  pCacheObj->capacity = 4096;
  pCacheObj->pEntryList = calloc(pCacheObj->capacity, sizeof(SCacheEntry));
370
  if (pCacheObj->pEntryList == NULL) {
371
    free(pCacheObj);
S
slguan 已提交
372
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
373 374
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
375

H
Haojun Liao 已提交
376
  // set free cache node callback function
377 378 379 380
  pCacheObj->hashFp         = taosGetDefaultHashFunction(keyType);
  pCacheObj->freeFp         = fn;
  pCacheObj->refreshTime    = refreshTimeInSeconds * 1000;
  pCacheObj->checkTick      = pCacheObj->refreshTime / SLEEP_DURATION;
381
  pCacheObj->extendLifespan = extendLifespan;  // the TTL after the last access
382

383 384
  if (__trashcan_lock_init(pCacheObj) != 0) {
    tfree(pCacheObj->pEntryList);
385
    free(pCacheObj);
S
cache  
Shengliang Guan 已提交
386

S
slguan 已提交
387
    uError("failed to init lock, reason:%s", strerror(errno));
H
hzcheng 已提交
388 389
    return NULL;
  }
390

391
  pCacheObj->name = strdup(cacheName);
392
  doRegisterCacheObj(pCacheObj);
393
  return pCacheObj;
394
}
H
hzcheng 已提交
395

S
cache  
Shengliang Guan 已提交
396 397
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize,
                   int32_t durationMS) {
398
  if (pCacheObj == NULL || pCacheObj->pEntryList == NULL || pCacheObj->deleting == 1) {
399 400
    return NULL;
  }
H
Haojun Liao 已提交
401

402
  SCacheNode *pNode1 = taosCreateCacheNode(key, keyLen, pData, dataSize, durationMS);
H
Haojun Liao 已提交
403 404 405 406 407
  if (pNode1 == NULL) {
    uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
    return NULL;
  }

H
Haojun Liao 已提交
408
  T_REF_INC(pNode1);
H
Haojun Liao 已提交
409

410
  SCacheEntry *pe = doFindEntry(pCacheObj, key, keyLen);
411

412
  taosWLockLatch(&pe->latch);
413

414 415
  SCacheNode *prev  = NULL;
  SCacheNode* pNode = doSearchInEntryList(pe, key, keyLen, &prev);
H
Haojun Liao 已提交
416

417 418 419 420 421 422 423 424 425 426 427
  if (pNode == NULL) {
    pushfrontNodeInEntryList(pe, pNode1);
    atomic_add_fetch_64(&pCacheObj->numOfElems, 1);
    atomic_add_fetch_64(&pCacheObj->sizeInBytes, pNode1->size);
    uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
           ", totalNum:%d sizeInBytes:%" PRId64 "bytes size:%" PRId64 "bytes",
           pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime, (int32_t)pCacheObj->numOfElems,
           pCacheObj->sizeInBytes, (int64_t)dataSize);
  } else {  // duplicated key exists
    // move current node to trashcan
    removeNodeInEntryList(pe, prev, pNode);
H
Haojun Liao 已提交
428

429 430 431
    if (T_REF_VAL_GET(pNode) == 0) {
      if (pCacheObj->freeFp) {
        pCacheObj->freeFp(pNode->data);
H
Haojun Liao 已提交
432
      }
433 434 435 436 437 438

      atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
      tfree(pNode);
    } else {
      taosAddToTrashcan(pCacheObj, pNode);
      uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, pNode->data);
H
Haojun Liao 已提交
439
    }
440 441 442 443 444 445 446

    pushfrontNodeInEntryList(pe, pNode1);
    atomic_add_fetch_64(&pCacheObj->sizeInBytes, pNode1->size);
    uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
           ", totalNum:%d sizeInBytes:%" PRId64 "bytes size:%" PRId64 "bytes",
           pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime, (int32_t)pCacheObj->numOfElems,
           pCacheObj->sizeInBytes, (int64_t)dataSize);
H
Haojun Liao 已提交
447 448
  }

449
  taosWUnLockLatch(&pe->latch);
H
Haojun Liao 已提交
450
  return pNode1->data;
H
hzcheng 已提交
451 452
}

H
Haojun Liao 已提交
453
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
454 455 456 457
  if (pCacheObj == NULL || pCacheObj->deleting == 1) {
    return NULL;
  }

458
  if (pCacheObj->numOfElems == 0) {
459
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
460
    return NULL;
461
  }
H
Haojun Liao 已提交
462

463 464 465 466
  SCacheNode *prev = NULL;
  SCacheEntry *pe = doFindEntry(pCacheObj, key, keyLen);

  taosRLockLatch(&pe->latch);
S
Shengliang Guan 已提交
467

468 469 470 471 472 473 474
  SCacheNode* pNode = doSearchInEntryList(pe, key, keyLen, &prev);
  if (pNode != NULL) {
    int32_t ref = T_REF_INC(pNode);
    ASSERT(ref > 0);
  }

  taosRUnLockLatch(&pe->latch);
S
Shengliang Guan 已提交
475

476
  void *pData = (pNode != NULL) ? pNode->data : NULL;
S
Shengliang Guan 已提交
477
  if (pData != NULL) {
478
    atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
S
cache  
Shengliang Guan 已提交
479
    uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData,
480
           T_REF_VAL_GET(pNode));
481
  } else {
482
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
H
Haojun Liao 已提交
483
    uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
484
  }
S
Shengliang Guan 已提交
485

486
  atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
S
Shengliang Guan 已提交
487
  return pData;
488 489
}

490 491
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
  if (pCacheObj == NULL || data == NULL) return NULL;
S
cache  
Shengliang Guan 已提交
492

493
  SCacheNode *ptNode = (SCacheNode *)((char *)data - sizeof(SCacheNode));
494
  if (ptNode->signature != (uint64_t)ptNode) {
H
Haojun Liao 已提交
495
    uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
496 497
    return NULL;
  }
498

499
  int32_t ref = T_REF_INC(ptNode);
H
Haojun Liao 已提交
500
  uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
H
Haojun Liao 已提交
501

502 503 504 505 506
  // the data if referenced by at least one object, so the reference count must be greater than the value of 2.
  assert(ref >= 2);
  return data;
}

507
void *taosCacheTransferData(SCacheObj *pCacheObj, void **data) {
H
Haojun Liao 已提交
508
  if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
S
cache  
Shengliang Guan 已提交
509

510
  SCacheNode *ptNode = (SCacheNode *)((char *)(*data) - sizeof(SCacheNode));
511
  if (ptNode->signature != (uint64_t)ptNode) {
H
Haojun Liao 已提交
512
    uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
513 514
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
515

516 517
  assert(T_REF_VAL_GET(ptNode) >= 1);
  char *d = *data;
S
cache  
Shengliang Guan 已提交
518

519 520 521
  // clear its reference to old area
  *data = NULL;
  return d;
H
hzcheng 已提交
522
}
523

524
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
H
Haojun Liao 已提交
525
  if (pCacheObj == NULL) {
H
Haojun Liao 已提交
526 527 528
    return;
  }

H
Haojun Liao 已提交
529
  if ((*data) == NULL) {
H
Haojun Liao 已提交
530
    uError("cache:%s, NULL data to release", pCacheObj->name);
531 532
    return;
  }
H
Haojun Liao 已提交
533 534 535 536

  // The operation of removal from hash table and addition to trashcan is not an atomic operation,
  // therefore the check for the empty of both the hash table and the trashcan has a race condition.
  // It happens when there is only one object in the cache, and two threads which has referenced this object
H
Haojun Liao 已提交
537
  // start to free the it simultaneously [TD-1569].
538
  SCacheNode *pNode = (SCacheNode *)((char *)(*data) - sizeof(SCacheNode));
539
  if (pNode->signature != (uint64_t)pNode) {
H
Haojun Liao 已提交
540
    uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode);
541 542
    return;
  }
543

544
  *data = NULL;
545

546
  // note: extend lifespan before dec ref count
H
Haojun Liao 已提交
547
  bool inTrashcan = pNode->inTrashcan;
H
Haojun Liao 已提交
548

H
Haojun Liao 已提交
549
  if (pCacheObj->extendLifespan && (!inTrashcan) && (!_remove)) {
550
    atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
S
cache  
Shengliang Guan 已提交
551
    uDebug("cache:%s, data:%p extend expire time: %" PRId64, pCacheObj->name, pNode->data, pNode->expireTime);
552
  }
H
Haojun Liao 已提交
553

H
Haojun Liao 已提交
554 555
  if (_remove) {
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
S
cache  
Shengliang Guan 已提交
556 557
    char *key = pNode->key;
    char *d = pNode->data;
H
Haojun Liao 已提交
558

559
    int32_t ref = T_REF_VAL_GET(pNode);
H
Haojun Liao 已提交
560
    uDebug("cache:%s, key:%p, %p is released, refcnt:%d, in trashcan:%d", pCacheObj->name, key, d, ref - 1, inTrashcan);
H
Haojun Liao 已提交
561 562

    /*
S
cache  
Shengliang Guan 已提交
563 564
     * If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all
     * users releasing this resources.
H
Haojun Liao 已提交
565 566 567 568
     *
     * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
     * that tries to do the same thing.
     */
H
Haojun Liao 已提交
569
    if (inTrashcan) {
H
Haojun Liao 已提交
570
      ref = T_REF_VAL_GET(pNode);
571

H
Haojun Liao 已提交
572 573 574
      if (ref == 1) {
        // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may be
        // destroyed by refresh worker if decrease ref count before removing it from linked-list.
H
Haojun Liao 已提交
575
        assert(pNode->pTNodeHeader->pData == pNode);
H
Haojun Liao 已提交
576

577
        __trashcan_wr_lock(pCacheObj);
H
Haojun Liao 已提交
578
        doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
579
        __trashcan_unlock(pCacheObj);
H
Haojun Liao 已提交
580

H
Haojun Liao 已提交
581 582 583
        ref = T_REF_DEC(pNode);
        assert(ref == 0);

H
Haojun Liao 已提交
584
        doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
H
Haojun Liao 已提交
585 586
      } else {
        ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
587
        assert(ref >= 0);
H
Haojun Liao 已提交
588 589
      }
    } else {
590 591
      // NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
      // when reaches here.
592 593 594 595
      SCacheNode * prev = NULL;
      SCacheEntry *pe = doFindEntry(pCacheObj, pNode->key, pNode->keyLen);

      taosWLockLatch(&pe->latch);
596
      ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
597

598 599 600 601 602
      SCacheNode *p = doSearchInEntryList(pe, pNode->key, pNode->keyLen, &prev);

      if (p != NULL) {
        // successfully remove from hash table, if failed, this node must have been move to trash already, do nothing.
        // note that the remove operation can be executed only once.
H
Haojun Liao 已提交
603
        if (p != pNode) {
S
cache  
Shengliang Guan 已提交
604
          uDebug(
605 606 607
              "cache:%s, key:%p, a new entry:%p found, refcnt:%d, prev entry:%p, refcnt:%d has been removed by "
              "others already, prev must in trashcan",
              pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data, T_REF_VAL_GET(pNode));
H
Haojun Liao 已提交
608

609
          assert(p->pTNodeHeader == NULL && pNode->pTNodeHeader != NULL);
H
Haojun Liao 已提交
610
        } else {
611
          removeNodeInEntryList(pe, prev, p);
H
Haojun Liao 已提交
612
          uDebug("cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d", pCacheObj->name, pNode->key,
H
Haojun Liao 已提交
613 614 615
                 pNode->data, ref);
          if (ref > 0) {
            assert(pNode->pTNodeHeader == NULL);
H
Haojun Liao 已提交
616
            taosAddToTrashcan(pCacheObj, pNode);
H
Haojun Liao 已提交
617
          } else {  // ref == 0
618
            atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
H
Haojun Liao 已提交
619

620
            int32_t size = (int32_t)pCacheObj->numOfElems;
H
Haojun Liao 已提交
621
            uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes",
622
                   pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->sizeInBytes);
H
Haojun Liao 已提交
623

H
Haojun Liao 已提交
624 625 626
            if (pCacheObj->freeFp) {
              pCacheObj->freeFp(pNode->data);
            }
H
Haojun Liao 已提交
627

H
Haojun Liao 已提交
628
            free(pNode);
H
Haojun Liao 已提交
629
          }
H
Haojun Liao 已提交
630
        }
631 632

        taosWUnLockLatch(&pe->latch);
633
      } else {
S
cache  
Shengliang Guan 已提交
634 635
        uDebug("cache:%s, key:%p, %p has been removed from hash table by others already, refcnt:%d", pCacheObj->name,
               pNode->key, pNode->data, ref);
H
Haojun Liao 已提交
636
      }
H
Haojun Liao 已提交
637 638
    }

639
  } else {
H
Haojun Liao 已提交
640
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
S
cache  
Shengliang Guan 已提交
641 642 643
    char *key = pNode->key;
    char *p = pNode->data;

H
Haojun Liao 已提交
644
    int32_t ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
645
    uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trashcan:%d", pCacheObj->name, key, p, ref, inTrashcan);
H
Haojun Liao 已提交
646 647
  }
}
H
Haojun Liao 已提交
648

649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676
void doTraverseElems(SCacheObj* pCacheObj, bool (*fp)(void *param, SCacheNode* pNode), SCacheObjTravSup* pSup) {
  int32_t numOfEntries = (int32_t)pCacheObj->capacity;
  for (int32_t i = 0; i < numOfEntries; ++i) {
    SCacheEntry *pEntry = &pCacheObj->pEntryList[i];
    if (pEntry->num == 0) {
      continue;
    }

    taosWLockLatch(&pEntry->latch);

    SCacheNode *pNode = pEntry->next;
    while (pNode != NULL) {
      SCacheNode *next = pNode->pNext;

      if (fp(pSup, pNode)) {
        pNode = pNode->pNext;
      } else {
        pEntry->next = next;
        pEntry->num -= 1;

        atomic_sub_fetch_64(&pCacheObj->numOfElems, 1);
        pNode = next;
      }
    }

    taosWUnLockLatch(&pEntry->latch);
  }
}
H
Haojun Liao 已提交
677

678 679 680
void taosCacheEmpty(SCacheObj* pCacheObj) {
  SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
  doTraverseElems(pCacheObj, doRemoveNodeFn, &sup);
H
Haojun Liao 已提交
681
  taosTrashcanEmpty(pCacheObj, false);
682 683 684 685 686 687
}

void taosCacheCleanup(SCacheObj *pCacheObj) {
  if (pCacheObj == NULL) {
    return;
  }
688 689

  pCacheObj->deleting = 1;
690 691

  // wait for the refresh thread quit before destroying the cache object.
692
  // But in the dll, the child thread will be killed before atexit takes effect.
S
cache  
Shengliang Guan 已提交
693 694 695
  while (atomic_load_8(&pCacheObj->deleting) != 0) {
    if (refreshWorkerNormalStopped) break;
    if (refreshWorkerUnexpectedStopped) return;
696
    taosMsleep(50);
S
TD-2805  
Shengliang Guan 已提交
697
  }
698

H
Haojun Liao 已提交
699
  uInfo("cache:%s will be cleaned up", pCacheObj->name);
700 701 702
  doCleanupDataCache(pCacheObj);
}

703 704
SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration) {
  size_t sizeInBytes = size + sizeof(SCacheNode) + keyLen;
705

706
  SCacheNode *pNewNode = calloc(1, sizeInBytes);
707
  if (pNewNode == NULL) {
708
    terrno = TSDB_CODE_OUT_OF_MEMORY;
709 710 711 712
    uError("failed to allocate memory, reason:%s", strerror(errno));
    return NULL;
  }

713 714
  pNewNode->data    = (char*)pNewNode + sizeof(SCacheNode);
  pNewNode->dataLen = size;
715 716
  memcpy(pNewNode->data, pData, size);

717 718
  pNewNode->key    = (char *)pNewNode + sizeof(SCacheNode) + size;
  pNewNode->keyLen = (uint16_t)keyLen;
719 720 721

  memcpy(pNewNode->key, key, keyLen);

722 723
  pNewNode->addedTime  = (uint64_t)taosGetTimestampMs();
  pNewNode->lifespan   = duration;
S
cache  
Shengliang Guan 已提交
724
  pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan;
725 726
  pNewNode->signature  = (uint64_t)pNewNode;
  pNewNode->size       = (uint32_t)sizeInBytes;
727 728 729 730

  return pNewNode;
}

731
void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheNode *pNode) {
H
Haojun Liao 已提交
732
  if (pNode->inTrashcan) { /* node is already in trash */
733
    assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
734 735 736
    return;
  }

737
  __trashcan_wr_lock(pCacheObj);
738 739
  STrashElem *pElem = calloc(1, sizeof(STrashElem));
  pElem->pData = pNode;
740 741
  pElem->prev = NULL;
  pElem->next = NULL;
H
Haojun Liao 已提交
742
  pNode->inTrashcan = true;
743
  pNode->pTNodeHeader = pElem;
H
Haojun Liao 已提交
744

745 746 747 748 749 750 751
  pElem->next = pCacheObj->pTrash;
  if (pCacheObj->pTrash) {
    pCacheObj->pTrash->prev = pElem;
  }

  pCacheObj->pTrash = pElem;
  pCacheObj->numOfElemsInTrash++;
752
  __trashcan_unlock(pCacheObj);
753

754 755
  uDebug("cache:%s key:%p, %p move to trashcan, pTrashElem:%p, numOfElem in trashcan:%d", pCacheObj->name, pNode->key,
         pNode->data, pElem, pCacheObj->numOfElemsInTrash);
756 757
}

H
Haojun Liao 已提交
758
void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
759
  __trashcan_wr_lock(pCacheObj);
760 761 762

  if (pCacheObj->numOfElemsInTrash == 0) {
    if (pCacheObj->pTrash != NULL) {
H
Haojun Liao 已提交
763
      pCacheObj->pTrash = NULL;
S
cache  
Shengliang Guan 已提交
764 765
      uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name,
             pCacheObj->numOfElemsInTrash);
766 767
    }

768
    __trashcan_unlock(pCacheObj);
769
    return;
H
hjxilinx 已提交
770
  }
771

S
cache  
Shengliang Guan 已提交
772
  const char *stat[] = {"false", "true"};
H
Haojun Liao 已提交
773
  uDebug("cache:%s start to cleanup trashcan, numOfElem in trashcan:%d, free:%s", pCacheObj->name,
S
cache  
Shengliang Guan 已提交
774
         pCacheObj->numOfElemsInTrash, (force ? stat[1] : stat[0]));
775

H
Haojun Liao 已提交
776
  STrashElem *pElem = pCacheObj->pTrash;
777 778
  while (pElem) {
    T_REF_VAL_CHECK(pElem->pData);
H
Haojun Liao 已提交
779
    assert(pElem->next != pElem && pElem->prev != pElem);
780 781

    if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
S
cache  
Shengliang Guan 已提交
782 783
      uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key,
             pElem->pData->data, pCacheObj->numOfElemsInTrash - 1);
784

H
Haojun Liao 已提交
785 786 787
      doRemoveElemInTrashcan(pCacheObj, pElem);
      doDestroyTrashcanElem(pCacheObj, pElem);
      pElem = pCacheObj->pTrash;
788 789 790 791 792
    } else {
      pElem = pElem->next;
    }
  }

793
  __trashcan_unlock(pCacheObj);
794 795 796
}

void doCleanupDataCache(SCacheObj *pCacheObj) {
797 798
  SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
  doTraverseElems(pCacheObj, doRemoveNodeFn, &sup);
799

H
Haojun Liao 已提交
800
  // todo memory leak if there are object with refcount greater than 0 in hash table?
801
  taosTrashcanEmpty(pCacheObj, true);
H
Haojun Liao 已提交
802

803
  __trashcan_lock_destroy(pCacheObj);
S
cache  
Shengliang Guan 已提交
804

805
  tfree(pCacheObj->pEntryList);
S
TD-1848  
Shengliang Guan 已提交
806
  tfree(pCacheObj->name);
807
  free(pCacheObj);
808
}
809

S
cache  
Shengliang Guan 已提交
810
static void doCacheRefresh(SCacheObj *pCacheObj, int64_t time, __cache_trav_fn_t fp, void *param1) {
H
Haojun Liao 已提交
811
  assert(pCacheObj != NULL);
812

813 814
  SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1};
  doTraverseElems(pCacheObj, doRemoveExpiredFn, &sup);
815 816
}

817
void taosCacheRefreshWorkerUnexpectedStopped(void) {
S
cache  
Shengliang Guan 已提交
818 819
  if (!refreshWorkerNormalStopped) {
    refreshWorkerUnexpectedStopped = true;
820 821 822
  }
}

S
cache  
Shengliang Guan 已提交
823
void *taosCacheTimedRefresh(void *handle) {
824 825
  assert(pCacheArrayList != NULL);
  uDebug("cache refresh thread starts");
826

H
Haojun Liao 已提交
827
  setThreadName("cacheRefresh");
828

S
cache  
Shengliang Guan 已提交
829 830
  const int32_t SLEEP_DURATION = 500;  // 500 ms
  int64_t       count = 0;
831
  atexit(taosCacheRefreshWorkerUnexpectedStopped);
832

S
cache  
Shengliang Guan 已提交
833
  while (1) {
834
    taosMsleep(SLEEP_DURATION);
835 836 837
    if (stopRefreshWorker) {
      goto _end;
    }
H
Haojun Liao 已提交
838

839 840 841
    pthread_mutex_lock(&guard);
    size_t size = taosArrayGetSize(pCacheArrayList);
    pthread_mutex_unlock(&guard);
842

843
    count += 1;
844

S
cache  
Shengliang Guan 已提交
845
    for (int32_t i = 0; i < size; ++i) {
846
      pthread_mutex_lock(&guard);
S
cache  
Shengliang Guan 已提交
847
      SCacheObj *pCacheObj = taosArrayGetP(pCacheArrayList, i);
848

849
      if (pCacheObj == NULL) {
850 851 852
        uError("object is destroyed. ignore and try next");
        pthread_mutex_unlock(&guard);
        continue;
853
      }
854

855 856
      // check if current cache object will be deleted every 500ms.
      if (pCacheObj->deleting) {
857 858 859
        taosArrayRemove(pCacheArrayList, i);
        size = taosArrayGetSize(pCacheArrayList);

S
cache  
Shengliang Guan 已提交
860 861
        uDebug("%s is destroying, remove it from refresh list, remain cache obj:%" PRIzu, pCacheObj->name, size);
        pCacheObj->deleting = 0;  // reset the deleting flag to enable pCacheObj to continue releasing resources.
862

863 864
        pthread_mutex_unlock(&guard);
        continue;
865
      }
866

867 868
      pthread_mutex_unlock(&guard);

869 870 871 872
      if ((count % pCacheObj->checkTick) != 0) {
        continue;
      }

873
      size_t elemInHash = pCacheObj->numOfElems;
874 875 876 877 878 879 880 881 882 883
      if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
        continue;
      }

      uDebug("%s refresh thread scan", pCacheObj->name);
      pCacheObj->statistics.refreshCount++;

      // refresh data in hash table
      if (elemInHash > 0) {
        int64_t now = taosGetTimestampMs();
A
AlexDuan 已提交
884
        doCacheRefresh(pCacheObj, now, NULL, NULL);
885 886 887 888
      }

      taosTrashcanEmpty(pCacheObj, false);
    }
889 890
  }

S
cache  
Shengliang Guan 已提交
891
_end:
892
  taosArrayDestroy(pCacheArrayList);
H
Haojun Liao 已提交
893 894 895

  pCacheArrayList = NULL;
  pthread_mutex_destroy(&guard);
S
cache  
Shengliang Guan 已提交
896
  refreshWorkerNormalStopped = true;
H
Haojun Liao 已提交
897

898
  uDebug("cache refresh thread quits");
899
  return NULL;
dengyihao's avatar
dengyihao 已提交
900
}
901

S
cache  
Shengliang Guan 已提交
902
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1) {
903 904 905 906 907
  if (pCacheObj == NULL) {
    return;
  }

  int64_t now = taosGetTimestampMs();
A
AlexDuan 已提交
908
  doCacheRefresh(pCacheObj, now, fp, param1);
909
}
910

H
Haojun Liao 已提交
911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001
void taosStopCacheRefreshWorker(void) {
  stopRefreshWorker = true;
}

size_t taosCacheGetNumOfObj(const SCacheObj* pCacheObj) {
  return pCacheObj->numOfElems + pCacheObj->numOfElemsInTrash;
}

SCacheIter* taosCacheCreateIter(const SCacheObj* pCacheObj) {
  ASSERT(pCacheObj != NULL);
  SCacheIter* pIter = calloc(1, sizeof(SCacheIter));
  pIter->pCacheObj  = (SCacheObj*) pCacheObj;
  pIter->entryIndex = -1;
  pIter->index      = -1;
  return pIter;
}

bool taosCacheIterNext(SCacheIter* pIter) {
  SCacheObj* pCacheObj = pIter->pCacheObj;

  if (pIter->index + 1 >= pIter->numOfObj) {
    if (pIter->entryIndex + 1 >= pCacheObj->capacity) {
      return false;
    }

    // release the reference for all objects in the snapshot
    for(int32_t i = 0; i < pIter->numOfObj; ++i) {
      char* p= pIter->pCurrent[i]->data;
      taosCacheRelease(pCacheObj, (void**) &p, false);
      pIter->pCurrent[i] = NULL;
    }

    while(1) {
      SCacheEntry *pEntry = &pCacheObj->pEntryList[++pIter->entryIndex];
      taosRLockLatch(&pEntry->latch);

      if (pEntry->num == 0) {
        taosRUnLockLatch(&pEntry->latch);
        continue;
      }

      if (pIter->numOfObj < pEntry->num) {
        char *tmp = realloc(pIter->pCurrent, pEntry->num * POINTER_BYTES);
        if (tmp == NULL) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          taosRUnLockLatch(&pEntry->latch);
          return false;
        }

        pIter->pCurrent = (SCacheNode **)tmp;
      }

      SCacheNode* pNode = pEntry->next;
      for (int32_t i = 0; i < pEntry->num; ++i) {
        ASSERT(pNode != NULL);

        pIter->pCurrent[i] = pNode;
        int32_t ref = T_REF_INC(pIter->pCurrent[i]);
        ASSERT(ref >= 1);

        pNode = pNode->pNext;
      }

      pIter->numOfObj = pEntry->num;
      taosRUnLockLatch(&pEntry->latch);

      pIter->index    = -1;
      break;
    }
  }

  pIter->index += 1;
  return true;
}

void* taosCacheIterGetData(const SCacheIter* pIter, size_t* len) {
  SCacheNode* pNode = pIter->pCurrent[pIter->index];
  *len = pNode->dataLen;
  return pNode->data;
}

void* taosCacheIterGetKey(const SCacheIter* pIter, size_t* len) {
  SCacheNode* pNode = pIter->pCurrent[pIter->index];
  *len = pNode->keyLen;
  return pNode->key;
}

void taosCacheDestroyIter(SCacheIter* pIter) {
  tfree(pIter->pCurrent);
  tfree(pIter);
}