tcache.c 16.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tcache.h"
17 18 19
#include "hash.h"
#include "hashfunc.h"

H
hzcheng 已提交
20 21 22 23 24
#include "tlog.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"

25
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) {
26
#if defined(LINUX)
27
  pthread_rwlock_wrlock(&pCacheObj->lock);
28
#else
29
  pthread_mutex_lock(&pCacheObj->lock);
30 31 32
#endif
}

33
static FORCE_INLINE void __cache_rd_lock(SCacheObj *pCacheObj) {
34
#if defined(LINUX)
35
  pthread_rwlock_rdlock(&pCacheObj->lock);
36
#else
37
  pthread_mutex_lock(&pCacheObj->lock);
38 39 40
#endif
}

41
static FORCE_INLINE void __cache_unlock(SCacheObj *pCacheObj) {
42
#if defined(LINUX)
43
  pthread_rwlock_unlock(&pCacheObj->lock);
44
#else
45
  pthread_mutex_unlock(&pCacheObj->lock);
46 47 48
#endif
}

49
static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pCacheObj) {
50
#if defined(LINUX)
51
  return pthread_rwlock_init(&pCacheObj->lock, NULL);
52
#else
53
  return pthread_mutex_init(&pCacheObj->lock, NULL);
54 55 56
#endif
}

57
static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
58
#if defined(LINUX)
59
  pthread_rwlock_destroy(&pCacheObj->lock);
60
#else
61
  pthread_mutex_destroy(&pCacheObj->lock);
62 63 64
#endif
}

65 66 67
static FORCE_INLINE void taosFreeNode(void *data) {
  SCacheDataNode *pNode = *(SCacheDataNode **)data;
  free(pNode);
H
hzcheng 已提交
68 69 70 71 72 73 74 75
}

/**
 * @param key      key of object for hash, usually a null-terminated string
 * @param keyLen   length of key
 * @param pData    actually data. required a consecutive memory block, no pointer is allowed
 *                 in pData. Pointer copy causes memory access error.
 * @param size     size of block
76 77
 * @param lifespan total survial expiredTime from now
 * @return         SCacheDataNode
H
hzcheng 已提交
78
 */
79 80 81 82 83
static SCacheDataNode *taosCreateHashNode(const char *key, size_t keyLen, const char *pData, size_t size,
                                          uint64_t duration) {
  size_t totalSize = size + sizeof(SCacheDataNode) + keyLen;
  
  SCacheDataNode *pNewNode = calloc(1, totalSize);
H
hzcheng 已提交
84 85 86 87
  if (pNewNode == NULL) {
    pError("failed to allocate memory, reason:%s", strerror(errno));
    return NULL;
  }
88 89 90 91 92 93 94 95 96 97 98
  
  memcpy(pNewNode->data, pData, size);
  
  pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size;
  pNewNode->keySize = keyLen;
  
  memcpy(pNewNode->key, key, keyLen);
  
  pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
  pNewNode->expiredTime = pNewNode->addedTime + duration;
  
H
hzcheng 已提交
99
  pNewNode->signature = (uint64_t)pNewNode;
100 101
  pNewNode->size = (uint32_t)totalSize;
  
H
hzcheng 已提交
102 103 104 105
  return pNewNode;
}

/**
106
 * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
H
hzcheng 已提交
107
 * It will be removed until the pNode->refCount == 0
108
 * @param pCacheObj    Cache object
H
hzcheng 已提交
109 110
 * @param pNode   Cache slot object
 */
111
static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
112
  if (pNode->inTrash) { /* node is already in trash */
H
hzcheng 已提交
113 114
    return;
  }
115 116 117 118
  
  STrashElem *pElem = calloc(1, sizeof(STrashElem));
  pElem->pData = pNode;
  
119 120 121
  pElem->next = pCacheObj->pTrash;
  if (pCacheObj->pTrash) {
    pCacheObj->pTrash->prev = pElem;
H
hzcheng 已提交
122
  }
123 124
  
  pElem->prev = NULL;
125
  pCacheObj->pTrash = pElem;
126 127
  
  pNode->inTrash = true;
128
  pCacheObj->numOfElemsInTrash++;
129
  
130
  pTrace("key:%s %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash);
H
hzcheng 已提交
131 132
}

133
static void taosRemoveFromTrash(SCacheObj *pCacheObj, STrashElem *pElem) {
134 135
  if (pElem->pData->signature != (uint64_t)pElem->pData) {
    pError("key:sig:%d %p data has been released, ignore", pElem->pData->signature, pElem->pData);
H
hzcheng 已提交
136 137
    return;
  }
138
  
139
  pCacheObj->numOfElemsInTrash--;
140 141 142
  if (pElem->prev) {
    pElem->prev->next = pElem->next;
  } else { /* pnode is the header, update header */
143
    pCacheObj->pTrash = pElem->next;
H
hzcheng 已提交
144
  }
145 146 147
  
  if (pElem->next) {
    pElem->next->prev = pElem->prev;
H
hzcheng 已提交
148
  }
149 150 151 152
  
  pElem->pData->signature = 0;
  free(pElem->pData);
  free(pElem);
H
hzcheng 已提交
153 154 155 156
}
/**
 * remove nodes in trash with refCount == 0 in cache
 * @param pNode
157
 * @param pCacheObj
H
hzcheng 已提交
158 159 160
 * @param force   force model, if true, remove data in trash without check refcount.
 *                may cause corruption. So, forece model only applys before cache is closed
 */
161 162
static void taosTrashEmpty(SCacheObj *pCacheObj, bool force) {
  __cache_wr_lock(pCacheObj);
163
  
164 165 166
  if (pCacheObj->numOfElemsInTrash == 0) {
    if (pCacheObj->pTrash != NULL) {
      pError("key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->numOfElemsInTrash);
H
hzcheng 已提交
167
    }
168
    pCacheObj->pTrash = NULL;
169
    
170
    __cache_unlock(pCacheObj);
H
hzcheng 已提交
171 172
    return;
  }
173
  
174
  STrashElem *pElem = pCacheObj->pTrash;
175 176 177 178 179
  
  while (pElem) {
    T_REF_VAL_CHECK(pElem->pData);
    if (pElem->next == pElem) {
      pElem->next = NULL;
H
hzcheng 已提交
180
    }
181 182 183
    
    if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
      pTrace("key:%s %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData,
184
             pCacheObj->numOfElemsInTrash - 1);
185 186 187
      STrashElem *p = pElem;
      
      pElem = pElem->next;
188
      taosRemoveFromTrash(pCacheObj, p);
H
hzcheng 已提交
189
    } else {
190
      pElem = pElem->next;
H
hzcheng 已提交
191 192
    }
  }
193
  
194 195
  assert(pCacheObj->numOfElemsInTrash >= 0);
  __cache_unlock(pCacheObj);
H
hzcheng 已提交
196 197 198 199
}

/**
 * release node
200
 * @param pCacheObj      cache object
H
hzcheng 已提交
201 202
 * @param pNode     data node
 */
203
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
H
hzcheng 已提交
204 205 206 207
  if (pNode->signature != (uint64_t)pNode) {
    pError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
    return;
  }
208
  
H
hjxilinx 已提交
209
  int32_t size = pNode->size;
210
  taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
211
  
212
  pTrace("key:%s is removed from cache,total:%d,size:%ldbytes", pNode->key, pCacheObj->totalSize, size);
H
hzcheng 已提交
213 214 215 216 217
  free(pNode);
}

/**
 * move the old node into trash
218
 * @param pCacheObj
H
hzcheng 已提交
219 220
 * @param pNode
 */
221 222 223
static FORCE_INLINE void taosCacheMoveToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
  taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
  taosAddToTrash(pCacheObj, pNode);
H
hzcheng 已提交
224 225 226 227
}

/**
 * update data in cache
228
 * @param pCacheObj
H
hzcheng 已提交
229 230 231 232 233 234 235
 * @param pNode
 * @param key
 * @param keyLen
 * @param pData
 * @param dataSize
 * @return
 */
236
static SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pCacheObj, SCacheDataNode *pNode, char *key, int32_t keyLen,
237 238 239
                                           void *pData, uint32_t dataSize, uint64_t duration) {
  SCacheDataNode *pNewNode = NULL;
  
H
hjxilinx 已提交
240
  // only a node is not referenced by any other object, in-place update it
241 242 243 244
  if (T_REF_VAL_GET(pNode) == 0) {
    size_t newSize = sizeof(SCacheDataNode) + dataSize + keyLen;
    
    pNewNode = (SCacheDataNode *)realloc(pNode, newSize);
H
hzcheng 已提交
245 246 247
    if (pNewNode == NULL) {
      return NULL;
    }
248
    
H
hzcheng 已提交
249 250
    pNewNode->signature = (uint64_t)pNewNode;
    memcpy(pNewNode->data, pData, dataSize);
251 252 253 254 255
    
    pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + dataSize;
    pNewNode->keySize = keyLen;
    memcpy(pNewNode->key, key, keyLen);
    
H
hjxilinx 已提交
256
    // update the timestamp information for updated key/value
257 258 259 260 261
    pNewNode->addedTime = taosGetTimestampMs();
    pNewNode->expiredTime = pNewNode->addedTime + duration;
    
    T_REF_INC(pNewNode);
    
H
hjxilinx 已提交
262
    // the address of this node may be changed, so the prev and next element should update the corresponding pointer
263
    taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
H
hzcheng 已提交
264
  } else {
265
    taosCacheMoveToTrash(pCacheObj, pNode);
266 267
    
    pNewNode = taosCreateHashNode(key, keyLen, pData, dataSize, duration);
H
hzcheng 已提交
268 269 270
    if (pNewNode == NULL) {
      return NULL;
    }
271 272 273 274
    
    T_REF_INC(pNewNode);
    
    // addedTime new element to hashtable
275
    taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
H
hzcheng 已提交
276
  }
277
  
H
hzcheng 已提交
278 279 280 281
  return pNewNode;
}

/**
282
 * addedTime data into hash table
H
hzcheng 已提交
283 284 285
 * @param key
 * @param pData
 * @param size
286
 * @param pCacheObj
H
hzcheng 已提交
287 288 289 290
 * @param keyLen
 * @param pNode
 * @return
 */
291
static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pCacheObj, char *key, size_t keyLen, const void *pData,
292 293
                                                       size_t dataSize, uint64_t duration) {
  SCacheDataNode *pNode = taosCreateHashNode(key, keyLen, pData, dataSize, duration);
H
hzcheng 已提交
294 295 296
  if (pNode == NULL) {
    return NULL;
  }
297 298
  
  T_REF_INC(pNode);
299
  taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode, sizeof(void *));
H
hzcheng 已提交
300 301 302
  return pNode;
}

303 304
static void doCleanupDataCache(SCacheObj *pCacheObj) {
  __cache_wr_lock(pCacheObj);
305
  
306 307
  if (taosHashGetSize(pCacheObj->pHashTable) > 0) {
    taosHashCleanup(pCacheObj->pHashTable);
308
  }
309
  
310
  __cache_unlock(pCacheObj);
311
  
312 313
  taosTrashEmpty(pCacheObj, true);
  __cache_lock_destroy(pCacheObj);
314
  
315 316
  memset(pCacheObj, 0, sizeof(SCacheObj));
  free(pCacheObj);
317 318
}

H
hzcheng 已提交
319
/**
320
 * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
H
hzcheng 已提交
321 322
 * @param handle   Cache object handle
 */
323
static void taosCacheRefresh(void *handle, void *tmrId) {
324
  SCacheObj *pCacheObj = (SCacheObj *)handle;
325
  
326
  if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
327 328 329
    pTrace("object is destroyed. no refresh retry");
    return;
  }
330
  
331 332
  if (pCacheObj->deleting == 1) {
    doCleanupDataCache(pCacheObj);
H
hzcheng 已提交
333 334
    return;
  }
335 336
  
  uint64_t expiredTime = taosGetTimestampMs();
337
  pCacheObj->statistics.refreshCount++;
338
  
339
  SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
340
  
341
  __cache_wr_lock(pCacheObj);
342
  while (taosHashIterNext(pIter)) {
343
    if (pCacheObj->deleting == 1) {
344
      taosHashDestroyIter(pIter);
345
      break;
346
    }
347 348 349
    
    SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
    if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
350
      taosCacheReleaseNode(pCacheObj, pNode);
H
hzcheng 已提交
351 352
    }
  }
353
  
354
  __cache_unlock(pCacheObj);
355 356 357
  
  taosHashDestroyIter(pIter);
  
358 359
  if (pCacheObj->deleting == 1) {  // clean up resources and abort
    doCleanupDataCache(pCacheObj);
360
  } else {
361 362
    taosTrashEmpty(pCacheObj, false);
    taosTmrReset(taosCacheRefresh, pCacheObj->refreshTime, pCacheObj, pCacheObj->tmrCtrl, &pCacheObj->pTimer);
H
hzcheng 已提交
363 364 365
  }
}

366 367
SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTime) {
  if (tmrCtrl == NULL || refreshTime <= 0) {
H
hzcheng 已提交
368 369
    return NULL;
  }
370
  
371 372
  SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
  if (pCacheObj == NULL) {
H
hzcheng 已提交
373 374 375
    pError("failed to allocate memory, reason:%s", strerror(errno));
    return NULL;
  }
376
  
377 378 379
  pCacheObj->pHashTable = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false);
  if (pCacheObj->pHashTable == NULL) {
    free(pCacheObj);
H
hzcheng 已提交
380 381 382
    pError("failed to allocate memory, reason:%s", strerror(errno));
    return NULL;
  }
383 384
  
  // set free cache node callback function for hash table
385
  taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode);
386
  
387 388
  pCacheObj->refreshTime = refreshTime * 1000;
  pCacheObj->tmrCtrl = tmrCtrl;
389
  
390
  taosTmrReset(taosCacheRefresh, pCacheObj->refreshTime, pCacheObj, pCacheObj->tmrCtrl, &pCacheObj->pTimer);
391
  
392 393 394 395
  if (__cache_lock_init(pCacheObj) != 0) {
    taosTmrStopA(&pCacheObj->pTimer);
    taosHashCleanup(pCacheObj->pHashTable);
    free(pCacheObj);
396
    
H
hzcheng 已提交
397 398 399
    pError("failed to init lock, reason:%s", strerror(errno));
    return NULL;
  }
400
  
401
  return pCacheObj;
402
}
H
hzcheng 已提交
403

404
void *taosCachePut(SCacheObj *pCacheObj, char *key, void *pData, size_t dataSize, int duration) {
405 406
  SCacheDataNode *pNode;
  
407
  if (pCacheObj == NULL || pCacheObj->pHashTable == NULL) {
408 409 410 411 412
    return NULL;
  }
  
  size_t keyLen = strlen(key);
  
413 414
  __cache_wr_lock(pCacheObj);
  SCacheDataNode **pt = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
415 416 417
  SCacheDataNode * pOld = (pt != NULL) ? (*pt) : NULL;
  
  if (pOld == NULL) {  // do addedTime to cache
418
    pNode = taosAddToCacheImpl(pCacheObj, key, keyLen, pData, dataSize, duration * 1000L);
419
    if (NULL != pNode) {
420
      pCacheObj->totalSize += pNode->size;
H
hjxilinx 已提交
421 422
      
      pTrace("key:%s %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", total:%d, size:%" PRId64 " bytes",
423
             key, pNode, pNode->addedTime, pNode->expiredTime, pCacheObj->totalSize, dataSize);
424 425
    }
  } else {  // old data exists, update the node
426
    pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L);
427 428 429
    pTrace("key:%s %p exist in cache, updated", key, pNode);
  }
  
430
  __cache_unlock(pCacheObj);
431 432
  
  return (pNode != NULL) ? pNode->data : NULL;
H
hzcheng 已提交
433 434
}

435 436
void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key) {
  if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
437
    return NULL;
438
  }
439 440 441
  
  uint32_t keyLen = (uint32_t)strlen(key);
  
442
  __cache_rd_lock(pCacheObj);
443
  
444
  SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
445 446 447 448
  if (ptNode != NULL) {
    T_REF_INC(*ptNode);
  }
  
449
  __cache_unlock(pCacheObj);
450 451
  
  if (ptNode != NULL) {
452
    atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
453 454
    pTrace("key:%s is retrieved from cache,refcnt:%d", key, T_REF_VAL_GET(*ptNode));
  } else {
455
    atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
456 457 458
    pTrace("key:%s not in cache,retrieved failed", key);
  }
  
459
  atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
460
  return (ptNode != NULL) ? (*ptNode)->data : NULL;
461 462
}

463 464
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
  if (pCacheObj == NULL || data == NULL) return NULL;
465
  
466 467
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
468
  
469
  if (ptNode->signature != (uint64_t)ptNode) {
470 471 472 473
    pError("key: %p the data from cache is invalid", ptNode);
    return NULL;
  }
  
474 475
  int32_t ref = T_REF_INC(ptNode);
  pTrace("%p addedTime ref data in cache, refCnt:%d", data, ref)
476 477 478 479 480 481
  
  // the data if referenced by at least one object, so the reference count must be greater than the value of 2.
  assert(ref >= 2);
  return data;
}

482 483
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
  if (pCacheObj == NULL || data == NULL) return NULL;
484
  
485 486
  size_t          offset = offsetof(SCacheDataNode, data);
  SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
487
  
488
  if (ptNode->signature != (uint64_t)ptNode) {
489 490 491 492
    pError("key: %p the data from cache is invalid", ptNode);
    return NULL;
  }
  
493
  assert(T_REF_VAL_GET(ptNode) >= 1);
494
  
495
  char *d = *data;
496 497 498 499 500
  
  // clear its reference to old area
  *data = NULL;
  
  return d;
H
hzcheng 已提交
501
}
502

503 504
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
  if (pCacheObj == NULL || (*data) == NULL || (taosHashGetSize(pCacheObj->pHashTable) + pCacheObj->numOfElemsInTrash == 0)) {
505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
    return;
  }
  
  size_t offset = offsetof(SCacheDataNode, data);
  
  SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
  
  if (pNode->signature != (uint64_t)pNode) {
    pError("key: %p release invalid cache data", pNode);
    return;
  }
  
  *data = NULL;
  
  if (_remove) {
520
    __cache_wr_lock(pCacheObj);
521 522 523
    // pNode may be released immediately by other thread after the reference count of pNode is set to 0,
    // So we need to lock it in the first place.
    T_REF_DEC(pNode);
524
    taosCacheMoveToTrash(pCacheObj, pNode);
525
    
526
    __cache_unlock(pCacheObj);
527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557
  } else {
    T_REF_DEC(pNode);
  }
}

void taosCacheEmpty(SCacheObj *pCacheObj) {
  SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
  
  __cache_wr_lock(pCacheObj);
  while (taosHashIterNext(pIter)) {
    if (pCacheObj->deleting == 1) {
      taosHashDestroyIter(pIter);
      break;
    }
    
    SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
    taosCacheMoveToTrash(pCacheObj, pNode);
  }
  __cache_unlock(pCacheObj);
  
  taosHashDestroyIter(pIter);
  taosTrashEmpty(pCacheObj, false);
}

void taosCacheCleanup(SCacheObj *pCacheObj) {
  if (pCacheObj == NULL) {
    return;
  }
  
  pCacheObj->deleting = 1;
}