tcache.c 25.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
S
cache  
Shengliang Guan 已提交
17
#include "tcache.h"
S
log  
Shengliang Guan 已提交
18
#include "tlog.h"
H
hzcheng 已提交
19 20 21
#include "ttimer.h"
#include "tutil.h"

22
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) {
23
#if defined(LINUX)
24
  pthread_rwlock_wrlock(&pCacheObj->lock);
25
#else
26
  pthread_mutex_lock(&pCacheObj->lock);
27 28 29
#endif
}

30
static FORCE_INLINE void __cache_unlock(SCacheObj *pCacheObj) {
31
#if defined(LINUX)
32
  pthread_rwlock_unlock(&pCacheObj->lock);
33
#else
34
  pthread_mutex_unlock(&pCacheObj->lock);
35 36 37
#endif
}

38
static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pCacheObj) {
39
#if defined(LINUX)
40
  return pthread_rwlock_init(&pCacheObj->lock, NULL);
41
#else
42
  return pthread_mutex_init(&pCacheObj->lock, NULL);
43 44 45
#endif
}

46
static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
47
#if defined(LINUX)
48
  pthread_rwlock_destroy(&pCacheObj->lock);
49
#else
50
  pthread_mutex_destroy(&pCacheObj->lock);
51 52 53
#endif
}

54 55 56 57 58 59 60 61 62 63
/**
 * do cleanup the taos cache
 * @param pCacheObj
 */
static void doCleanupDataCache(SCacheObj *pCacheObj);

/**
 * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
 * @param handle   Cache object handle
 */
S
cache  
Shengliang Guan 已提交
64
static void *taosCacheTimedRefresh(void *handle);
65

S
cache  
Shengliang Guan 已提交
66 67 68 69 70 71 72
static pthread_t       cacheRefreshWorker = {0};
static pthread_once_t  cacheThreadInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER;
static SArray         *pCacheArrayList = NULL;
static bool            stopRefreshWorker = false;
static bool            refreshWorkerNormalStopped = false;
static bool            refreshWorkerUnexpectedStopped = false;
73

H
Haojun Liao 已提交
74
static void doInitRefreshThread(void) {
75 76 77 78 79 80 81 82 83 84
  pCacheArrayList = taosArrayInit(4, POINTER_BYTES);

  pthread_attr_t thattr;
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

  pthread_create(&cacheRefreshWorker, &thattr, taosCacheTimedRefresh, NULL);
  pthread_attr_destroy(&thattr);
}

S
cache  
Shengliang Guan 已提交
85
pthread_t doRegisterCacheObj(SCacheObj *pCacheObj) {
86 87 88 89 90 91 92 93 94
  pthread_once(&cacheThreadInit, doInitRefreshThread);

  pthread_mutex_lock(&guard);
  taosArrayPush(pCacheArrayList, &pCacheObj);
  pthread_mutex_unlock(&guard);

  return cacheRefreshWorker;
}

H
hzcheng 已提交
95 96 97 98 99 100
/**
 * @param key      key of object for hash, usually a null-terminated string
 * @param keyLen   length of key
 * @param pData    actually data. required a consecutive memory block, no pointer is allowed
 *                 in pData. Pointer copy causes memory access error.
 * @param size     size of block
101 102
 * @param lifespan total survial expiredTime from now
 * @return         SCacheDataNode
H
hzcheng 已提交
103
 */
S
cache  
Shengliang Guan 已提交
104 105
static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
                                           uint64_t duration);
H
hzcheng 已提交
106 107

/**
108
 * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
H
hzcheng 已提交
109
 * It will be removed until the pNode->refCount == 0
110
 * @param pCacheObj    Cache object
H
hzcheng 已提交
111 112
 * @param pNode   Cache slot object
 */
H
Haojun Liao 已提交
113
static void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode);
114

H
hzcheng 已提交
115 116 117
/**
 * remove nodes in trash with refCount == 0 in cache
 * @param pNode
118
 * @param pCacheObj
H
hzcheng 已提交
119 120 121
 * @param force   force model, if true, remove data in trash without check refcount.
 *                may cause corruption. So, forece model only applys before cache is closed
 */
H
Haojun Liao 已提交
122
static void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force);
H
hzcheng 已提交
123 124 125

/**
 * release node
126
 * @param pCacheObj      cache object
H
Haojun Liao 已提交
127
 * @param pNode          data node
H
hzcheng 已提交
128
 */
129
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
H
hzcheng 已提交
130
  if (pNode->signature != (uint64_t)pNode) {
S
slguan 已提交
131
    uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
H
hzcheng 已提交
132 133
    return;
  }
134

H
Haojun Liao 已提交
135
  atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size);
S
Shengliang Guan 已提交
136
  int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable);
H
Haojun Liao 已提交
137 138
  assert(size > 0);

H
Haojun Liao 已提交
139
  uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, total num:%d size:%" PRId64 "bytes",
H
Haojun Liao 已提交
140
         pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize);
H
Haojun Liao 已提交
141 142 143 144

  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pNode->data);
  }
145

H
hzcheng 已提交
146 147 148
  free(pNode);
}

S
cache  
Shengliang Guan 已提交
149 150
static FORCE_INLINE STrashElem *doRemoveElemInTrashcan(SCacheObj *pCacheObj, STrashElem *pElem) {
  if (pElem->pData->signature != (uint64_t)pElem->pData) {
H
Haojun Liao 已提交
151
    uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
H
Haojun Liao 已提交
152
    return NULL;
H
Haojun Liao 已提交
153 154
  }

S
cache  
Shengliang Guan 已提交
155
  STrashElem *next = pElem->next;
H
Haojun Liao 已提交
156

H
Haojun Liao 已提交
157 158 159
  pCacheObj->numOfElemsInTrash--;
  if (pElem->prev) {
    pElem->prev->next = pElem->next;
S
cache  
Shengliang Guan 已提交
160
  } else {  // pnode is the header, update header
H
Haojun Liao 已提交
161 162 163
    pCacheObj->pTrash = pElem->next;
  }

H
Haojun Liao 已提交
164 165
  if (next) {
    next->prev = pElem->prev;
H
Haojun Liao 已提交
166
  }
H
Haojun Liao 已提交
167 168 169 170 171 172

  if (pCacheObj->numOfElemsInTrash == 0) {
    assert(pCacheObj->pTrash == NULL);
  }

  return next;
H
Haojun Liao 已提交
173 174
}

S
cache  
Shengliang Guan 已提交
175
static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj *pCacheObj, STrashElem *pElem) {
H
Haojun Liao 已提交
176 177 178 179 180 181 182
  if (pCacheObj->freeFp) {
    pCacheObj->freeFp(pElem->pData->data);
  }

  free(pElem->pData);
  free(pElem);
}
H
hzcheng 已提交
183

S
cache  
Shengliang Guan 已提交
184 185 186
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn,
                         const char *cacheName) {
  const int32_t SLEEP_DURATION = 500;  // 500 ms
187

H
Haojun Liao 已提交
188
  if (refreshTimeInSeconds <= 0) {
H
hzcheng 已提交
189 190
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
191

192 193
  SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
  if (pCacheObj == NULL) {
S
slguan 已提交
194
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
195 196
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
197

H
Haojun Liao 已提交
198
  pCacheObj->pHashTable = taosHashInit(4096, taosGetDefaultHashFunction(keyType), false, HASH_ENTRY_LOCK);
H
Haojun Liao 已提交
199
  pCacheObj->name = strdup(cacheName);
200 201
  if (pCacheObj->pHashTable == NULL) {
    free(pCacheObj);
S
slguan 已提交
202
    uError("failed to allocate memory, reason:%s", strerror(errno));
H
hzcheng 已提交
203 204
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
205

H
Haojun Liao 已提交
206
  // set free cache node callback function
S
cache  
Shengliang Guan 已提交
207
  pCacheObj->freeFp = fn;
H
Haojun Liao 已提交
208
  pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
S
cache  
Shengliang Guan 已提交
209
  pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION;
210
  pCacheObj->extendLifespan = extendLifespan;  // the TTL after the last access
211

212 213 214
  if (__cache_lock_init(pCacheObj) != 0) {
    taosHashCleanup(pCacheObj->pHashTable);
    free(pCacheObj);
S
cache  
Shengliang Guan 已提交
215

S
slguan 已提交
216
    uError("failed to init lock, reason:%s", strerror(errno));
H
hzcheng 已提交
217 218
    return NULL;
  }
219

220
  doRegisterCacheObj(pCacheObj);
221
  return pCacheObj;
222
}
H
hzcheng 已提交
223

S
cache  
Shengliang Guan 已提交
224 225
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize,
                   int32_t durationMS) {
226
  if (pCacheObj == NULL || pCacheObj->pHashTable == NULL || pCacheObj->deleting == 1) {
227 228
    return NULL;
  }
H
Haojun Liao 已提交
229

230
  SCacheDataNode *pNode1 = taosCreateCacheNode(key, keyLen, pData, dataSize, durationMS);
H
Haojun Liao 已提交
231 232 233 234 235
  if (pNode1 == NULL) {
    uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
    return NULL;
  }

H
Haojun Liao 已提交
236
  T_REF_INC(pNode1);
H
Haojun Liao 已提交
237

H
Haojun Liao 已提交
238 239
  int32_t succ = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
  if (succ == 0) {
H
Haojun Liao 已提交
240
    atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size);
H
Haojun Liao 已提交
241
    uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
H
Haojun Liao 已提交
242
           ", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes",
H
Haojun Liao 已提交
243 244
           pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime,
           (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
H
Haojun Liao 已提交
245 246
  } else {  // duplicated key exists
    while (1) {
S
cache  
Shengliang Guan 已提交
247 248
      SCacheDataNode *p = NULL;
      //      int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, key, keyLen, (void*) &p, sizeof(void*));
H
Haojun Liao 已提交
249
      int32_t ret = taosHashRemove(pCacheObj->pHashTable, key, keyLen);
H
Haojun Liao 已提交
250 251

      // add to trashcan
H
Haojun Liao 已提交
252 253 254 255 256 257
      if (ret == 0) {
        if (T_REF_VAL_GET(p) == 0) {
          if (pCacheObj->freeFp) {
            pCacheObj->freeFp(p->data);
          }

H
Haojun Liao 已提交
258
          atomic_sub_fetch_64(&pCacheObj->totalSize, p->size);
S
TD-1848  
Shengliang Guan 已提交
259
          tfree(p);
H
Haojun Liao 已提交
260
        } else {
H
Haojun Liao 已提交
261
          taosAddToTrashcan(pCacheObj, p);
H
Haojun Liao 已提交
262
          uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, p->data);
H
Haojun Liao 已提交
263 264
        }
      }
H
Haojun Liao 已提交
265

H
Haojun Liao 已提交
266
      assert(T_REF_VAL_GET(pNode1) == 1);
267

H
Haojun Liao 已提交
268
      ret = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
H
Haojun Liao 已提交
269 270
      if (ret == 0) {
        atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size);
271

H
Haojun Liao 已提交
272
        uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
H
Haojun Liao 已提交
273 274 275
               ", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes",
               pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime,
               (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
H
Haojun Liao 已提交
276

H
Haojun Liao 已提交
277
        return pNode1->data;
H
Haojun Liao 已提交
278

H
Haojun Liao 已提交
279 280 281 282
      } else {
        // failed, try again
      }
    }
H
Haojun Liao 已提交
283 284 285
  }

  return pNode1->data;
H
hzcheng 已提交
286 287
}

S
cache  
Shengliang Guan 已提交
288
static void incRefFn(void *ptNode) {
H
Haojun Liao 已提交
289 290
  assert(ptNode != NULL);

S
cache  
Shengliang Guan 已提交
291
  SCacheDataNode **p = (SCacheDataNode **)ptNode;
H
Haojun Liao 已提交
292
  assert(T_REF_VAL_GET(*p) >= 0);
H
Haojun Liao 已提交
293

H
Haojun Liao 已提交
294 295 296 297
  int32_t ret = T_REF_INC(*p);
  assert(ret > 0);
}

H
Haojun Liao 已提交
298
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
299 300 301 302 303 304
  if (pCacheObj == NULL || pCacheObj->deleting == 1) {
    return NULL;
  }

  if (taosHashGetSize(pCacheObj->pHashTable) == 0) {
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
305
    return NULL;
306
  }
H
Haojun Liao 已提交
307

S
cache  
Shengliang Guan 已提交
308
  SCacheDataNode *ptNode = NULL;
H
Haojun Liao 已提交
309
  taosHashGetClone(pCacheObj->pHashTable, key, keyLen, &ptNode);
S
cache  
Shengliang Guan 已提交
310
  //  taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode);
S
Shengliang Guan 已提交
311

S
cache  
Shengliang Guan 已提交
312
  void *pData = (ptNode != NULL) ? ptNode->data : NULL;
S
Shengliang Guan 已提交
313 314

  if (pData != NULL) {
315
    atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
S
cache  
Shengliang Guan 已提交
316 317
    uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData,
           T_REF_VAL_GET(ptNode));
318
  } else {
319
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
H
Haojun Liao 已提交
320
    uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
321
  }
S
Shengliang Guan 已提交
322

323
  atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
S
Shengliang Guan 已提交
324
  return pData;
325 326
}

327 328
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
  if (pCacheObj == NULL || data == NULL) return NULL;
S
cache  
Shengliang Guan 已提交
329

330 331
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
S
cache  
Shengliang Guan 已提交
332

333
  if (ptNode->signature != (uint64_t)ptNode) {
H
Haojun Liao 已提交
334
    uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
335 336
    return NULL;
  }
337

338
  int32_t ref = T_REF_INC(ptNode);
H
Haojun Liao 已提交
339
  uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
H
Haojun Liao 已提交
340

341 342 343 344 345
  // the data if referenced by at least one object, so the reference count must be greater than the value of 2.
  assert(ref >= 2);
  return data;
}

346
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
H
Haojun Liao 已提交
347
  if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
S
cache  
Shengliang Guan 已提交
348

349 350
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
S
cache  
Shengliang Guan 已提交
351

352
  if (ptNode->signature != (uint64_t)ptNode) {
H
Haojun Liao 已提交
353
    uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
354 355
    return NULL;
  }
S
cache  
Shengliang Guan 已提交
356

357
  assert(T_REF_VAL_GET(ptNode) >= 1);
S
cache  
Shengliang Guan 已提交
358

359
  char *d = *data;
S
cache  
Shengliang Guan 已提交
360

361 362
  // clear its reference to old area
  *data = NULL;
S
cache  
Shengliang Guan 已提交
363

364
  return d;
H
hzcheng 已提交
365
}
366

367
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
H
Haojun Liao 已提交
368
  if (pCacheObj == NULL) {
H
Haojun Liao 已提交
369 370 371
    return;
  }

H
Haojun Liao 已提交
372
  if ((*data) == NULL) {
H
Haojun Liao 已提交
373
    uError("cache:%s, NULL data to release", pCacheObj->name);
374 375
    return;
  }
H
Haojun Liao 已提交
376 377 378 379

  // The operation of removal from hash table and addition to trashcan is not an atomic operation,
  // therefore the check for the empty of both the hash table and the trashcan has a race condition.
  // It happens when there is only one object in the cache, and two threads which has referenced this object
H
Haojun Liao 已提交
380
  // start to free the it simultaneously [TD-1569].
381
  size_t offset = offsetof(SCacheDataNode, data);
S
cache  
Shengliang Guan 已提交
382

383 384
  SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
  if (pNode->signature != (uint64_t)pNode) {
H
Haojun Liao 已提交
385
    uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode);
386 387
    return;
  }
388

389
  *data = NULL;
390

391
  // note: extend lifespan before dec ref count
H
Haojun Liao 已提交
392
  bool inTrashcan = pNode->inTrashcan;
H
Haojun Liao 已提交
393

H
Haojun Liao 已提交
394
  if (pCacheObj->extendLifespan && (!inTrashcan) && (!_remove)) {
395
    atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
S
cache  
Shengliang Guan 已提交
396
    uDebug("cache:%s, data:%p extend expire time: %" PRId64, pCacheObj->name, pNode->data, pNode->expireTime);
397
  }
H
Haojun Liao 已提交
398

H
Haojun Liao 已提交
399 400
  if (_remove) {
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
S
cache  
Shengliang Guan 已提交
401 402
    char *key = pNode->key;
    char *d = pNode->data;
H
Haojun Liao 已提交
403

404
    int32_t ref = T_REF_VAL_GET(pNode);
H
Haojun Liao 已提交
405
    uDebug("cache:%s, key:%p, %p is released, refcnt:%d, in trashcan:%d", pCacheObj->name, key, d, ref - 1, inTrashcan);
H
Haojun Liao 已提交
406 407

    /*
S
cache  
Shengliang Guan 已提交
408 409
     * If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all
     * users releasing this resources.
H
Haojun Liao 已提交
410 411 412 413
     *
     * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
     * that tries to do the same thing.
     */
H
Haojun Liao 已提交
414
    if (inTrashcan) {
H
Haojun Liao 已提交
415
      ref = T_REF_VAL_GET(pNode);
416

H
Haojun Liao 已提交
417 418 419
      if (ref == 1) {
        // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may be
        // destroyed by refresh worker if decrease ref count before removing it from linked-list.
H
Haojun Liao 已提交
420
        assert(pNode->pTNodeHeader->pData == pNode);
H
Haojun Liao 已提交
421

H
Haojun Liao 已提交
422 423 424 425
        __cache_wr_lock(pCacheObj);
        doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
        __cache_unlock(pCacheObj);

H
Haojun Liao 已提交
426 427 428
        ref = T_REF_DEC(pNode);
        assert(ref == 0);

H
Haojun Liao 已提交
429
        doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
H
Haojun Liao 已提交
430 431
      } else {
        ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
432
        assert(ref >= 0);
H
Haojun Liao 已提交
433 434
      }
    } else {
435 436
      // NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
      // when reaches here.
H
Haojun Liao 已提交
437
      SCacheDataNode *p = NULL;
S
cache  
Shengliang Guan 已提交
438 439 440
      int32_t         ret = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
      //      int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, pNode->key, pNode->keySize, &p, sizeof(void
      //      *));
441
      ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
442 443 444 445

      // successfully remove from hash table, if failed, this node must have been move to trash already, do nothing.
      // note that the remove operation can be executed only once.
      if (ret == 0) {
H
Haojun Liao 已提交
446
        if (p != pNode) {
S
cache  
Shengliang Guan 已提交
447 448 449 450
          uDebug(
              "cache:%s, key:%p, successfully removed a new entry:%p, refcnt:%d, prev entry:%p has been removed by "
              "others already",
              pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data);
H
Haojun Liao 已提交
451 452

          assert(p->pTNodeHeader == NULL);
H
Haojun Liao 已提交
453
          taosAddToTrashcan(pCacheObj, p);
H
Haojun Liao 已提交
454
        } else {
H
Haojun Liao 已提交
455
          uDebug("cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d", pCacheObj->name, pNode->key,
H
Haojun Liao 已提交
456 457 458
                 pNode->data, ref);
          if (ref > 0) {
            assert(pNode->pTNodeHeader == NULL);
459

H
Haojun Liao 已提交
460
            taosAddToTrashcan(pCacheObj, pNode);
H
Haojun Liao 已提交
461 462
          } else {  // ref == 0
            atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size);
H
Haojun Liao 已提交
463

H
Haojun Liao 已提交
464
            int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable);
H
Haojun Liao 已提交
465
            uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes",
H
Haojun Liao 已提交
466
                   pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->totalSize);
H
Haojun Liao 已提交
467

H
Haojun Liao 已提交
468 469 470
            if (pCacheObj->freeFp) {
              pCacheObj->freeFp(pNode->data);
            }
H
Haojun Liao 已提交
471

H
Haojun Liao 已提交
472
            free(pNode);
H
Haojun Liao 已提交
473
          }
H
Haojun Liao 已提交
474
        }
475
      } else {
S
cache  
Shengliang Guan 已提交
476 477
        uDebug("cache:%s, key:%p, %p has been removed from hash table by others already, refcnt:%d", pCacheObj->name,
               pNode->key, pNode->data, ref);
H
Haojun Liao 已提交
478
      }
H
Haojun Liao 已提交
479 480
    }

481
  } else {
H
Haojun Liao 已提交
482
    // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
S
cache  
Shengliang Guan 已提交
483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505
    char *key = pNode->key;
    char *p = pNode->data;

    //    int32_t ref = T_REF_VAL_GET(pNode);
    //
    //    if (ref == 1 && inTrashcan) {
    //      // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may
    //      be
    //      // destroyed by refresh worker if decrease ref count before removing it from linked-list.
    //      assert(pNode->pTNodeHeader->pData == pNode);
    //
    //      __cache_wr_lock(pCacheObj);
    //      doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
    //      __cache_unlock(pCacheObj);
    //
    //      ref = T_REF_DEC(pNode);
    //      assert(ref == 0);
    //
    //      doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
    //    } else {
    //      ref = T_REF_DEC(pNode);
    //      assert(ref >= 0);
    //    }
H
Haojun Liao 已提交
506

H
Haojun Liao 已提交
507
    int32_t ref = T_REF_DEC(pNode);
H
Haojun Liao 已提交
508
    uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trashcan:%d", pCacheObj->name, key, p, ref, inTrashcan);
H
Haojun Liao 已提交
509 510
  }
}
H
Haojun Liao 已提交
511

H
Haojun Liao 已提交
512
typedef struct SHashTravSupp {
S
cache  
Shengliang Guan 已提交
513 514
  SCacheObj        *pCacheObj;
  int64_t           time;
A
AlexDuan 已提交
515
  __cache_trav_fn_t fp;
S
cache  
Shengliang Guan 已提交
516
  void             *param1;
H
Haojun Liao 已提交
517 518
} SHashTravSupp;

S
cache  
Shengliang Guan 已提交
519 520 521
static bool travHashTableEmptyFn(void *param, void *data) {
  SHashTravSupp *ps = (SHashTravSupp *)param;
  SCacheObj     *pCacheObj = ps->pCacheObj;
H
Haojun Liao 已提交
522

S
cache  
Shengliang Guan 已提交
523
  SCacheDataNode *pNode = *(SCacheDataNode **)data;
H
Haojun Liao 已提交
524 525 526

  if (T_REF_VAL_GET(pNode) == 0) {
    taosCacheReleaseNode(pCacheObj, pNode);
S
cache  
Shengliang Guan 已提交
527
  } else {  // do add to trashcan
H
Haojun Liao 已提交
528
    taosAddToTrashcan(pCacheObj, pNode);
529
  }
H
Haojun Liao 已提交
530 531 532

  // this node should be remove from hash table
  return false;
533 534
}

535
void taosCacheEmpty(SCacheObj *pCacheObj) {
H
Haojun Liao 已提交
536 537
  SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};

S
cache  
Shengliang Guan 已提交
538
  //  taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
H
Haojun Liao 已提交
539
  taosTrashcanEmpty(pCacheObj, false);
540 541 542 543 544 545
}

void taosCacheCleanup(SCacheObj *pCacheObj) {
  if (pCacheObj == NULL) {
    return;
  }
546 547

  pCacheObj->deleting = 1;
548 549

  // wait for the refresh thread quit before destroying the cache object.
550
  // But in the dll, the child thread will be killed before atexit takes effect.
S
cache  
Shengliang Guan 已提交
551 552 553
  while (atomic_load_8(&pCacheObj->deleting) != 0) {
    if (refreshWorkerNormalStopped) break;
    if (refreshWorkerUnexpectedStopped) return;
554
    taosMsleep(50);
S
TD-2805  
Shengliang Guan 已提交
555
  }
556

H
Haojun Liao 已提交
557
  uInfo("cache:%s will be cleaned up", pCacheObj->name);
558 559 560
  doCleanupDataCache(pCacheObj);
}

H
Haojun Liao 已提交
561
SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration) {
562
  size_t totalSize = size + sizeof(SCacheDataNode) + keyLen;
563 564 565 566 567 568 569 570 571 572

  SCacheDataNode *pNewNode = calloc(1, totalSize);
  if (pNewNode == NULL) {
    uError("failed to allocate memory, reason:%s", strerror(errno));
    return NULL;
  }

  memcpy(pNewNode->data, pData, size);

  pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size;
S
Shengliang Guan 已提交
573
  pNewNode->keySize = (uint16_t)keyLen;
574 575 576

  memcpy(pNewNode->key, key, keyLen);

S
cache  
Shengliang Guan 已提交
577 578 579 580 581
  pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
  pNewNode->lifespan = duration;
  pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan;
  pNewNode->signature = (uint64_t)pNewNode;
  pNewNode->size = (uint32_t)totalSize;
582 583 584 585

  return pNewNode;
}

H
Haojun Liao 已提交
586
void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
H
Haojun Liao 已提交
587
  if (pNode->inTrashcan) { /* node is already in trash */
588
    assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
589 590 591
    return;
  }

592
  __cache_wr_lock(pCacheObj);
593 594
  STrashElem *pElem = calloc(1, sizeof(STrashElem));
  pElem->pData = pNode;
595 596
  pElem->prev = NULL;
  pElem->next = NULL;
H
Haojun Liao 已提交
597
  pNode->inTrashcan = true;
598
  pNode->pTNodeHeader = pElem;
H
Haojun Liao 已提交
599

600 601 602 603 604 605 606
  pElem->next = pCacheObj->pTrash;
  if (pCacheObj->pTrash) {
    pCacheObj->pTrash->prev = pElem;
  }

  pCacheObj->pTrash = pElem;
  pCacheObj->numOfElemsInTrash++;
H
Haojun Liao 已提交
607
  __cache_unlock(pCacheObj);
608

609 610
  uDebug("cache:%s key:%p, %p move to trashcan, pTrashElem:%p, numOfElem in trashcan:%d", pCacheObj->name, pNode->key,
         pNode->data, pElem, pCacheObj->numOfElemsInTrash);
611 612
}

H
Haojun Liao 已提交
613
void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
614 615 616 617
  __cache_wr_lock(pCacheObj);

  if (pCacheObj->numOfElemsInTrash == 0) {
    if (pCacheObj->pTrash != NULL) {
H
Haojun Liao 已提交
618
      pCacheObj->pTrash = NULL;
S
cache  
Shengliang Guan 已提交
619 620
      uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name,
             pCacheObj->numOfElemsInTrash);
621 622 623 624
    }

    __cache_unlock(pCacheObj);
    return;
H
hjxilinx 已提交
625
  }
626

S
cache  
Shengliang Guan 已提交
627
  const char *stat[] = {"false", "true"};
H
Haojun Liao 已提交
628
  uDebug("cache:%s start to cleanup trashcan, numOfElem in trashcan:%d, free:%s", pCacheObj->name,
S
cache  
Shengliang Guan 已提交
629
         pCacheObj->numOfElemsInTrash, (force ? stat[1] : stat[0]));
630

H
Haojun Liao 已提交
631
  STrashElem *pElem = pCacheObj->pTrash;
632 633
  while (pElem) {
    T_REF_VAL_CHECK(pElem->pData);
H
Haojun Liao 已提交
634
    assert(pElem->next != pElem && pElem->prev != pElem);
635 636

    if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
S
cache  
Shengliang Guan 已提交
637 638
      uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key,
             pElem->pData->data, pCacheObj->numOfElemsInTrash - 1);
639

H
Haojun Liao 已提交
640 641 642
      doRemoveElemInTrashcan(pCacheObj, pElem);
      doDestroyTrashcanElem(pCacheObj, pElem);
      pElem = pCacheObj->pTrash;
643 644 645 646 647 648 649 650 651
    } else {
      pElem = pElem->next;
    }
  }

  __cache_unlock(pCacheObj);
}

void doCleanupDataCache(SCacheObj *pCacheObj) {
S
cache  
Shengliang Guan 已提交
652 653
  //  SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
  //  taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
654

H
Haojun Liao 已提交
655 656
  // todo memory leak if there are object with refcount greater than 0 in hash table?
  taosHashCleanup(pCacheObj->pHashTable);
657
  taosTrashcanEmpty(pCacheObj, true);
H
Haojun Liao 已提交
658

659
  __cache_lock_destroy(pCacheObj);
S
cache  
Shengliang Guan 已提交
660

S
TD-1848  
Shengliang Guan 已提交
661
  tfree(pCacheObj->name);
662 663
  memset(pCacheObj, 0, sizeof(SCacheObj));
  free(pCacheObj);
664
}
665

S
cache  
Shengliang Guan 已提交
666 667 668
bool travHashTableFn(void *param, void *data) {
  SHashTravSupp *ps = (SHashTravSupp *)param;
  SCacheObj     *pCacheObj = ps->pCacheObj;
669

S
cache  
Shengliang Guan 已提交
670
  SCacheDataNode *pNode = *(SCacheDataNode **)data;
S
Shengliang Guan 已提交
671
  if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) {
H
Haojun Liao 已提交
672
    taosCacheReleaseNode(pCacheObj, pNode);
H
Haojun Liao 已提交
673

H
Haojun Liao 已提交
674 675 676
    // this node should be remove from hash table
    return false;
  }
677

H
Haojun Liao 已提交
678
  if (ps->fp) {
A
AlexDuan 已提交
679
    (ps->fp)(pNode->data, ps->param1);
680 681
  }

H
Haojun Liao 已提交
682 683 684 685
  // do not remove element in hash table
  return true;
}

S
cache  
Shengliang Guan 已提交
686
static void doCacheRefresh(SCacheObj *pCacheObj, int64_t time, __cache_trav_fn_t fp, void *param1) {
H
Haojun Liao 已提交
687
  assert(pCacheObj != NULL);
688

A
AlexDuan 已提交
689
  SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1};
S
cache  
Shengliang Guan 已提交
690
  //  taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup);
691 692
}

693
void taosCacheRefreshWorkerUnexpectedStopped(void) {
S
cache  
Shengliang Guan 已提交
694 695
  if (!refreshWorkerNormalStopped) {
    refreshWorkerUnexpectedStopped = true;
696 697 698
  }
}

S
cache  
Shengliang Guan 已提交
699
void *taosCacheTimedRefresh(void *handle) {
700 701
  assert(pCacheArrayList != NULL);
  uDebug("cache refresh thread starts");
702

H
Haojun Liao 已提交
703
  setThreadName("cacheRefresh");
704

S
cache  
Shengliang Guan 已提交
705 706
  const int32_t SLEEP_DURATION = 500;  // 500 ms
  int64_t       count = 0;
707
  atexit(taosCacheRefreshWorkerUnexpectedStopped);
708

S
cache  
Shengliang Guan 已提交
709
  while (1) {
710
    taosMsleep(SLEEP_DURATION);
711 712 713
    if (stopRefreshWorker) {
      goto _end;
    }
H
Haojun Liao 已提交
714

715 716 717
    pthread_mutex_lock(&guard);
    size_t size = taosArrayGetSize(pCacheArrayList);
    pthread_mutex_unlock(&guard);
718

719
    count += 1;
720

S
cache  
Shengliang Guan 已提交
721
    for (int32_t i = 0; i < size; ++i) {
722
      pthread_mutex_lock(&guard);
S
cache  
Shengliang Guan 已提交
723
      SCacheObj *pCacheObj = taosArrayGetP(pCacheArrayList, i);
724

725
      if (pCacheObj == NULL) {
726 727 728
        uError("object is destroyed. ignore and try next");
        pthread_mutex_unlock(&guard);
        continue;
729
      }
730

731 732
      // check if current cache object will be deleted every 500ms.
      if (pCacheObj->deleting) {
733 734 735
        taosArrayRemove(pCacheArrayList, i);
        size = taosArrayGetSize(pCacheArrayList);

S
cache  
Shengliang Guan 已提交
736 737
        uDebug("%s is destroying, remove it from refresh list, remain cache obj:%" PRIzu, pCacheObj->name, size);
        pCacheObj->deleting = 0;  // reset the deleting flag to enable pCacheObj to continue releasing resources.
738

739 740
        pthread_mutex_unlock(&guard);
        continue;
741
      }
742

743 744
      pthread_mutex_unlock(&guard);

745 746 747 748 749 750 751 752 753 754 755 756 757 758 759
      if ((count % pCacheObj->checkTick) != 0) {
        continue;
      }

      size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable);
      if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
        continue;
      }

      uDebug("%s refresh thread scan", pCacheObj->name);
      pCacheObj->statistics.refreshCount++;

      // refresh data in hash table
      if (elemInHash > 0) {
        int64_t now = taosGetTimestampMs();
A
AlexDuan 已提交
760
        doCacheRefresh(pCacheObj, now, NULL, NULL);
761 762 763 764
      }

      taosTrashcanEmpty(pCacheObj, false);
    }
765 766
  }

S
cache  
Shengliang Guan 已提交
767
_end:
768
  taosArrayDestroy(pCacheArrayList);
H
Haojun Liao 已提交
769 770 771

  pCacheArrayList = NULL;
  pthread_mutex_destroy(&guard);
S
cache  
Shengliang Guan 已提交
772
  refreshWorkerNormalStopped = true;
H
Haojun Liao 已提交
773

774
  uDebug("cache refresh thread quits");
775
  return NULL;
dengyihao's avatar
dengyihao 已提交
776
}
777

S
cache  
Shengliang Guan 已提交
778
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1) {
779 780 781 782 783
  if (pCacheObj == NULL) {
    return;
  }

  int64_t now = taosGetTimestampMs();
A
AlexDuan 已提交
784
  doCacheRefresh(pCacheObj, now, fp, param1);
785
}
786

S
cache  
Shengliang Guan 已提交
787
void taosStopCacheRefreshWorker(void) { stopRefreshWorker = true; }