tlrucache.c 23.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tlrucache.h"
#include "os.h"
#include "taoserror.h"
#include "tarray.h"
M
Minglei Jin 已提交
21 22
#include "tdef.h"
#include "tlog.h"
23

M
Minglei Jin 已提交
24
typedef struct SLRUEntry      SLRUEntry;
25 26
typedef struct SLRUEntryTable SLRUEntryTable;
typedef struct SLRUCacheShard SLRUCacheShard;
M
Minglei Jin 已提交
27
typedef struct SShardedCache  SShardedCache;
28 29

enum {
M
Minglei Jin 已提交
30
  TAOS_LRU_IN_CACHE = (1 << 0),  // Whether this entry is referenced by the hash table.
31

M
Minglei Jin 已提交
32
  TAOS_LRU_IS_HIGH_PRI = (1 << 1),  // Whether this entry is high priority entry.
33

M
Minglei Jin 已提交
34
  TAOS_LRU_IN_HIGH_PRI_POOL = (1 << 2),  // Whether this entry is in high-pri pool.
35

M
Minglei Jin 已提交
36
  TAOS_LRU_HAS_HIT = (1 << 3),  // Whether this entry has had any lookups (hits).
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
};

struct SLRUEntry {
  void               *value;
  _taos_lru_deleter_t deleter;
  SLRUEntry          *nextHash;
  SLRUEntry          *next;
  SLRUEntry          *prev;
  size_t              totalCharge;
  size_t              keyLength;
  uint32_t            hash;
  uint32_t            refs;
  uint8_t             flags;
  char                keyData[1];
};

M
Minglei Jin 已提交
53
#define TAOS_LRU_ENTRY_IN_CACHE(h)     ((h)->flags & TAOS_LRU_IN_CACHE)
54
#define TAOS_LRU_ENTRY_IN_HIGH_POOL(h) ((h)->flags & TAOS_LRU_IN_HIGH_PRI_POOL)
M
Minglei Jin 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
#define TAOS_LRU_ENTRY_IS_HIGH_PRI(h)  ((h)->flags & TAOS_LRU_IS_HIGH_PRI)
#define TAOS_LRU_ENTRY_HAS_HIT(h)      ((h)->flags & TAOS_LRU_HAS_HIT)

#define TAOS_LRU_ENTRY_SET_IN_CACHE(h, inCache) \
  do {                                          \
    if (inCache) {                              \
      (h)->flags |= TAOS_LRU_IN_CACHE;          \
    } else {                                    \
      (h)->flags &= ~TAOS_LRU_IN_CACHE;         \
    }                                           \
  } while (0)
#define TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(h, inHigh) \
  do {                                             \
    if (inHigh) {                                  \
      (h)->flags |= TAOS_LRU_IN_HIGH_PRI_POOL;     \
    } else {                                       \
      (h)->flags &= ~TAOS_LRU_IN_HIGH_PRI_POOL;    \
    }                                              \
  } while (0)
#define TAOS_LRU_ENTRY_SET_PRIORITY(h, priority) \
  do {                                           \
    if (priority == TAOS_LRU_PRIORITY_HIGH) {    \
      (h)->flags |= TAOS_LRU_IS_HIGH_PRI;        \
    } else {                                     \
      (h)->flags &= ~TAOS_LRU_IS_HIGH_PRI;       \
    }                                            \
  } while (0)
82 83 84
#define TAOS_LRU_ENTRY_SET_HIT(h) ((h)->flags |= TAOS_LRU_HAS_HIT)

#define TAOS_LRU_ENTRY_HAS_REFS(h) ((h)->refs > 0)
M
Minglei Jin 已提交
85
#define TAOS_LRU_ENTRY_REF(h)      (++(h)->refs)
86 87

static bool taosLRUEntryUnref(SLRUEntry *entry) {
H
Haojun Liao 已提交
88
  ASSERT(entry->refs > 0);
89 90 91 92 93
  --entry->refs;
  return entry->refs == 0;
}

static void taosLRUEntryFree(SLRUEntry *entry) {
H
Haojun Liao 已提交
94
  ASSERT(entry->refs == 0);
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113

  if (entry->deleter) {
    (*entry->deleter)(entry->keyData, entry->keyLength, entry->value);
  }

  taosMemoryFree(entry);
}

typedef void (*_taos_lru_table_func_t)(SLRUEntry *entry);

struct SLRUEntryTable {
  int         lengthBits;
  SLRUEntry **list;
  uint32_t    elems;
  int         maxLengthBits;
};

static int taosLRUEntryTableInit(SLRUEntryTable *table, int maxUpperHashBits) {
  table->lengthBits = 4;
M
Minglei Jin 已提交
114
  table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry *));
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
  if (!table->list) {
    return -1;
  }

  table->elems = 0;
  table->maxLengthBits = maxUpperHashBits;

  return 0;
}

static void taosLRUEntryTableApply(SLRUEntryTable *table, _taos_lru_table_func_t func, uint32_t begin, uint32_t end) {
  for (uint32_t i = begin; i < end; ++i) {
    SLRUEntry *h = table->list[i];
    while (h) {
      SLRUEntry *n = h->nextHash;
H
Haojun Liao 已提交
130
      ASSERT(TAOS_LRU_ENTRY_IN_CACHE(h));
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
      func(h);
      h = n;
    }
  }
}

static void taosLRUEntryTableFree(SLRUEntry *entry) {
  if (!TAOS_LRU_ENTRY_HAS_REFS(entry)) {
    taosLRUEntryFree(entry);
  }
}

static void taosLRUEntryTableCleanup(SLRUEntryTable *table) {
  taosLRUEntryTableApply(table, taosLRUEntryTableFree, 0, 1 << table->lengthBits);

  taosMemoryFree(table->list);
}

M
Minglei Jin 已提交
149
static SLRUEntry **taosLRUEntryTableFindPtr(SLRUEntryTable *table, const void *key, size_t keyLen, uint32_t hash) {
150 151 152 153 154 155 156 157
  SLRUEntry **entry = &table->list[hash >> (32 - table->lengthBits)];
  while (*entry && ((*entry)->hash != hash || memcmp(key, (*entry)->keyData, keyLen) != 0)) {
    entry = &(*entry)->nextHash;
  }

  return entry;
}

M
Minglei Jin 已提交
158
static void taosLRUEntryTableResize(SLRUEntryTable *table) {
159 160 161 162 163 164 165 166 167
  int lengthBits = table->lengthBits;
  if (lengthBits >= table->maxLengthBits) {
    return;
  }

  if (lengthBits >= 31) {
    return;
  }

M
Minglei Jin 已提交
168 169 170
  uint32_t    oldLength = 1 << lengthBits;
  int         newLengthBits = lengthBits + 1;
  SLRUEntry **newList = taosMemoryCalloc(1 << newLengthBits, sizeof(SLRUEntry *));
171 172 173 174 175 176 177
  if (!newList) {
    return;
  }
  uint32_t count = 0;
  for (uint32_t i = 0; i < oldLength; ++i) {
    SLRUEntry *entry = table->list[i];
    while (entry) {
M
Minglei Jin 已提交
178 179
      SLRUEntry  *next = entry->nextHash;
      uint32_t    hash = entry->hash;
180 181 182 183 184 185 186
      SLRUEntry **ptr = &newList[hash >> (32 - newLengthBits)];
      entry->nextHash = *ptr;
      *ptr = entry;
      entry = next;
      ++count;
    }
  }
H
Haojun Liao 已提交
187
  ASSERT(table->elems == count);
188 189 190 191 192 193

  taosMemoryFree(table->list);
  table->list = newList;
  table->lengthBits = newLengthBits;
}

M
Minglei Jin 已提交
194
static SLRUEntry *taosLRUEntryTableLookup(SLRUEntryTable *table, const void *key, size_t keyLen, uint32_t hash) {
195 196 197
  return *taosLRUEntryTableFindPtr(table, key, keyLen, hash);
}

M
Minglei Jin 已提交
198
static SLRUEntry *taosLRUEntryTableInsert(SLRUEntryTable *table, SLRUEntry *entry) {
199
  SLRUEntry **ptr = taosLRUEntryTableFindPtr(table, entry->keyData, entry->keyLength, entry->hash);
M
Minglei Jin 已提交
200
  SLRUEntry  *old = *ptr;
201 202 203 204 205 206 207 208 209 210 211 212
  entry->nextHash = (old == NULL) ? NULL : old->nextHash;
  *ptr = entry;
  if (old == NULL) {
    ++table->elems;
    if ((table->elems >> table->lengthBits) > 0) {
      taosLRUEntryTableResize(table);
    }
  }

  return old;
}

M
Minglei Jin 已提交
213
static SLRUEntry *taosLRUEntryTableRemove(SLRUEntryTable *table, const void *key, size_t keyLen, uint32_t hash) {
214
  SLRUEntry **entry = taosLRUEntryTableFindPtr(table, key, keyLen, hash);
M
Minglei Jin 已提交
215
  SLRUEntry  *result = *entry;
216 217 218 219 220 221 222 223 224
  if (result) {
    *entry = result->nextHash;
    --table->elems;
  }

  return result;
}

struct SLRUCacheShard {
M
Minglei Jin 已提交
225 226 227 228 229 230 231 232 233 234 235
  size_t         capacity;
  size_t         highPriPoolUsage;
  bool           strictCapacity;
  double         highPriPoolRatio;
  double         highPriPoolCapacity;
  SLRUEntry      lru;
  SLRUEntry     *lruLowPri;
  SLRUEntryTable table;
  size_t         usage;     // Memory size for entries residing in the cache.
  size_t         lruUsage;  // Memory size for entries residing only in the LRU list.
  TdThreadMutex  mutex;
236 237 238 239 240 241 242
};

#define TAOS_LRU_CACHE_SHARD_HASH32(key, len) (MurmurHash3_32((key), (len)))

static void taosLRUCacheShardMaintainPoolSize(SLRUCacheShard *shard) {
  while (shard->highPriPoolUsage > shard->highPriPoolCapacity) {
    shard->lruLowPri = shard->lruLowPri->next;
H
Haojun Liao 已提交
243
    ASSERT(shard->lruLowPri != &shard->lru);
244 245
    TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(shard->lruLowPri, false);

H
Haojun Liao 已提交
246
    ASSERT(shard->highPriPoolUsage >= shard->lruLowPri->totalCharge);
247 248 249 250 251
    shard->highPriPoolUsage -= shard->lruLowPri->totalCharge;
  }
}

static void taosLRUCacheShardLRUInsert(SLRUCacheShard *shard, SLRUEntry *e) {
H
Haojun Liao 已提交
252
  ASSERT(e->next == NULL && e->prev == NULL);
253

M
Minglei Jin 已提交
254
  if (shard->highPriPoolRatio > 0 && (TAOS_LRU_ENTRY_IS_HIGH_PRI(e) || TAOS_LRU_ENTRY_HAS_HIT(e))) {
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
    e->next = &shard->lru;
    e->prev = shard->lru.prev;

    e->prev->next = e;
    e->next->prev = e;

    TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(e, true);
    shard->highPriPoolUsage += e->totalCharge;
    taosLRUCacheShardMaintainPoolSize(shard);
  } else {
    e->next = shard->lruLowPri->next;
    e->prev = shard->lruLowPri;

    e->prev->next = e;
    e->next->prev = e;
M
Minglei Jin 已提交
270

271 272 273 274 275 276 277 278
    TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(e, false);
    shard->lruLowPri = e;
  }

  shard->lruUsage += e->totalCharge;
}

static void taosLRUCacheShardLRURemove(SLRUCacheShard *shard, SLRUEntry *e) {
H
Haojun Liao 已提交
279
  ASSERT(e->next && e->prev);
280 281 282 283 284 285 286 287

  if (shard->lruLowPri == e) {
    shard->lruLowPri = e->prev;
  }
  e->next->prev = e->prev;
  e->prev->next = e->next;
  e->prev = e->next = NULL;

H
Haojun Liao 已提交
288
  ASSERT(shard->lruUsage >= e->totalCharge);
289 290
  shard->lruUsage -= e->totalCharge;
  if (TAOS_LRU_ENTRY_IN_HIGH_POOL(e)) {
H
Haojun Liao 已提交
291
    ASSERT(shard->highPriPoolUsage >= e->totalCharge);
292 293 294 295 296 297 298
    shard->highPriPoolUsage -= e->totalCharge;
  }
}

static void taosLRUCacheShardEvictLRU(SLRUCacheShard *shard, size_t charge, SArray *deleted) {
  while (shard->usage + charge > shard->capacity && shard->lru.next != &shard->lru) {
    SLRUEntry *old = shard->lru.next;
H
Haojun Liao 已提交
299
    ASSERT(TAOS_LRU_ENTRY_IN_CACHE(old) && !TAOS_LRU_ENTRY_HAS_REFS(old));
300 301 302 303 304

    taosLRUCacheShardLRURemove(shard, old);
    taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);

    TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
H
Haojun Liao 已提交
305
    ASSERT(shard->usage >= old->totalCharge);
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
    shard->usage -= old->totalCharge;

    taosArrayPush(deleted, &old);
  }
}

static void taosLRUCacheShardSetCapacity(SLRUCacheShard *shard, size_t capacity) {
  SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES);

  taosThreadMutexLock(&shard->mutex);

  shard->capacity = capacity;
  shard->highPriPoolCapacity = capacity * shard->highPriPoolRatio;
  taosLRUCacheShardEvictLRU(shard, 0, lastReferenceList);

  taosThreadMutexUnlock(&shard->mutex);

  for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) {
    SLRUEntry *entry = taosArrayGetP(lastReferenceList, i);
M
Minglei Jin 已提交
325
    taosLRUEntryFree(entry);
326 327 328 329
  }
  taosArrayDestroy(lastReferenceList);
}

M
Minglei Jin 已提交
330 331
static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool strict, double highPriPoolRatio,
                                 int maxUpperHashBits) {
332 333 334 335 336 337
  if (taosLRUEntryTableInit(&shard->table, maxUpperHashBits) < 0) {
    return -1;
  }

  taosThreadMutexInit(&shard->mutex, NULL);

338
  taosThreadMutexLock(&shard->mutex);
339 340 341 342 343 344 345 346 347 348 349 350
  shard->capacity = 0;
  shard->highPriPoolUsage = 0;
  shard->strictCapacity = strict;
  shard->highPriPoolRatio = highPriPoolRatio;
  shard->highPriPoolCapacity = 0;

  shard->usage = 0;
  shard->lruUsage = 0;

  shard->lru.next = &shard->lru;
  shard->lru.prev = &shard->lru;
  shard->lruLowPri = &shard->lru;
351
  taosThreadMutexUnlock(&shard->mutex);
352 353 354 355 356 357 358 359 360 361 362 363

  taosLRUCacheShardSetCapacity(shard, capacity);

  return 0;
}

static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) {
  taosThreadMutexDestroy(&shard->mutex);

  taosLRUEntryTableCleanup(&shard->table);
}

M
Minglei Jin 已提交
364 365
static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *e, LRUHandle **handle,
                                              bool freeOnFail) {
366
  LRUStatus status = TAOS_LRU_STATUS_OK;
M
Minglei Jin 已提交
367
  SArray   *lastReferenceList = taosArrayInit(16, POINTER_BYTES);
368 369 370 371

  taosThreadMutexLock(&shard->mutex);

  taosLRUCacheShardEvictLRU(shard, e->totalCharge, lastReferenceList);
M
Minglei Jin 已提交
372

373 374 375 376 377 378
  if (shard->usage + e->totalCharge > shard->capacity && (shard->strictCapacity || handle == NULL)) {
    TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
    if (handle == NULL) {
      taosArrayPush(lastReferenceList, &e);
    } else {
      if (freeOnFail) {
M
Minglei Jin 已提交
379
        taosMemoryFree(e);
380

M
Minglei Jin 已提交
381
        *handle = NULL;
382 383 384 385 386 387 388 389 390 391
      }

      status = TAOS_LRU_STATUS_INCOMPLETE;
    }
  } else {
    SLRUEntry *old = taosLRUEntryTableInsert(&shard->table, e);
    shard->usage += e->totalCharge;
    if (old != NULL) {
      status = TAOS_LRU_STATUS_OK_OVERWRITTEN;

H
Haojun Liao 已提交
392
      ASSERT(TAOS_LRU_ENTRY_IN_CACHE(old));
393
      TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
M
Minglei Jin 已提交
394
      if (!TAOS_LRU_ENTRY_HAS_REFS(old)) {
M
Minglei Jin 已提交
395
        taosLRUCacheShardLRURemove(shard, old);
H
Haojun Liao 已提交
396
        ASSERT(shard->usage >= old->totalCharge);
M
Minglei Jin 已提交
397
        shard->usage -= old->totalCharge;
398

M
Minglei Jin 已提交
399
        taosArrayPush(lastReferenceList, &old);
400 401 402 403 404 405
      }
    }
    if (handle == NULL) {
      taosLRUCacheShardLRUInsert(shard, e);
    } else {
      if (!TAOS_LRU_ENTRY_HAS_REFS(e)) {
M
Minglei Jin 已提交
406
        TAOS_LRU_ENTRY_REF(e);
407 408
      }

M
Minglei Jin 已提交
409
      *handle = (LRUHandle *)e;
410 411 412 413 414 415 416 417
    }
  }

  taosThreadMutexUnlock(&shard->mutex);

  for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) {
    SLRUEntry *entry = taosArrayGetP(lastReferenceList, i);

M
Minglei Jin 已提交
418
    taosLRUEntryFree(entry);
419 420 421 422 423 424 425
  }
  taosArrayDestroy(lastReferenceList);

  return status;
}

static LRUStatus taosLRUCacheShardInsert(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash,
M
Minglei Jin 已提交
426 427
                                         void *value, size_t charge, _taos_lru_deleter_t deleter, LRUHandle **handle,
                                         LRUPriority priority) {
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
  SLRUEntry *e = taosMemoryCalloc(1, sizeof(SLRUEntry) - 1 + keyLen);
  if (!e) {
    return TAOS_LRU_STATUS_FAIL;
  }

  e->value = value;
  e->flags = 0;
  e->deleter = deleter;
  e->keyLength = keyLen;
  e->hash = hash;
  e->refs = 0;
  e->next = e->prev = NULL;
  TAOS_LRU_ENTRY_SET_IN_CACHE(e, true);

  TAOS_LRU_ENTRY_SET_PRIORITY(e, priority);
  memcpy(e->keyData, key, keyLen);
  // TODO: e->CalcTotalCharge(charge, metadataChargePolicy);
  e->totalCharge = charge;

  return taosLRUCacheShardInsertEntry(shard, e, handle, true);
}

static LRUHandle *taosLRUCacheShardLookup(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash) {
  SLRUEntry *e = NULL;

  taosThreadMutexLock(&shard->mutex);
  e = taosLRUEntryTableLookup(&shard->table, key, keyLen, hash);
  if (e != NULL) {
H
Haojun Liao 已提交
456
    ASSERT(TAOS_LRU_ENTRY_IN_CACHE(e));
457 458 459 460 461 462 463 464 465
    if (!TAOS_LRU_ENTRY_HAS_REFS(e)) {
      taosLRUCacheShardLRURemove(shard, e);
    }
    TAOS_LRU_ENTRY_REF(e);
    TAOS_LRU_ENTRY_SET_HIT(e);
  }

  taosThreadMutexUnlock(&shard->mutex);

M
Minglei Jin 已提交
466
  return (LRUHandle *)e;
467 468 469 470 471 472 473 474
}

static void taosLRUCacheShardErase(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash) {
  bool lastReference = false;
  taosThreadMutexLock(&shard->mutex);

  SLRUEntry *e = taosLRUEntryTableRemove(&shard->table, key, keyLen, hash);
  if (e != NULL) {
H
Haojun Liao 已提交
475
    ASSERT(TAOS_LRU_ENTRY_IN_CACHE(e));
476 477 478 479
    TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
    if (!TAOS_LRU_ENTRY_HAS_REFS(e)) {
      taosLRUCacheShardLRURemove(shard, e);

H
Haojun Liao 已提交
480
      ASSERT(shard->usage >= e->totalCharge);
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
      shard->usage -= e->totalCharge;
      lastReference = true;
    }
  }

  taosThreadMutexUnlock(&shard->mutex);

  if (lastReference) {
    taosLRUEntryFree(e);
  }
}

static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) {
  SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES);

  taosThreadMutexLock(&shard->mutex);

  while (shard->lru.next != &shard->lru) {
    SLRUEntry *old = shard->lru.next;
H
Haojun Liao 已提交
500
    ASSERT(TAOS_LRU_ENTRY_IN_CACHE(old) && !TAOS_LRU_ENTRY_HAS_REFS(old));
501 502 503
    taosLRUCacheShardLRURemove(shard, old);
    taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);
    TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
H
Haojun Liao 已提交
504
    ASSERT(shard->usage >= old->totalCharge);
505
    shard->usage -= old->totalCharge;
M
Minglei Jin 已提交
506

507 508 509 510 511 512 513 514
    taosArrayPush(lastReferenceList, &old);
  }

  taosThreadMutexUnlock(&shard->mutex);

  for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) {
    SLRUEntry *entry = taosArrayGetP(lastReferenceList, i);

M
Minglei Jin 已提交
515
    taosLRUEntryFree(entry);
516 517 518 519 520 521
  }

  taosArrayDestroy(lastReferenceList);
}

static bool taosLRUCacheShardRef(SLRUCacheShard *shard, LRUHandle *handle) {
M
Minglei Jin 已提交
522
  SLRUEntry *e = (SLRUEntry *)handle;
523 524
  taosThreadMutexLock(&shard->mutex);

H
Haojun Liao 已提交
525
  ASSERT(TAOS_LRU_ENTRY_HAS_REFS(e));
526 527 528 529 530 531 532 533 534 535 536 537
  TAOS_LRU_ENTRY_REF(e);

  taosThreadMutexUnlock(&shard->mutex);

  return true;
}

static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, bool eraseIfLastRef) {
  if (handle == NULL) {
    return false;
  }

M
Minglei Jin 已提交
538 539
  SLRUEntry *e = (SLRUEntry *)handle;
  bool       lastReference = false;
540 541 542 543 544 545

  taosThreadMutexLock(&shard->mutex);

  lastReference = taosLRUEntryUnref(e);
  if (lastReference && TAOS_LRU_ENTRY_IN_CACHE(e)) {
    if (shard->usage > shard->capacity || eraseIfLastRef) {
H
Haojun Liao 已提交
546
      ASSERT(shard->lru.next == &shard->lru || eraseIfLastRef);
547 548 549 550 551 552 553 554 555 556 557

      taosLRUEntryTableRemove(&shard->table, e->keyData, e->keyLength, e->hash);
      TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
    } else {
      taosLRUCacheShardLRUInsert(shard, e);

      lastReference = false;
    }
  }

  if (lastReference && e->value) {
H
Haojun Liao 已提交
558
    ASSERT(shard->usage >= e->totalCharge);
559 560
    shard->usage -= e->totalCharge;
  }
M
Minglei Jin 已提交
561

562 563 564 565 566 567 568 569 570 571 572
  taosThreadMutexUnlock(&shard->mutex);

  if (lastReference) {
    taosLRUEntryFree(e);
  }

  return lastReference;
}

static size_t taosLRUCacheShardGetUsage(SLRUCacheShard *shard) {
  size_t usage = 0;
M
Minglei Jin 已提交
573

574 575 576 577 578 579 580
  taosThreadMutexLock(&shard->mutex);
  usage = shard->usage;
  taosThreadMutexUnlock(&shard->mutex);

  return usage;
}

581 582 583 584 585 586 587 588 589 590
static int32_t taosLRUCacheShardGetElems(SLRUCacheShard *shard) {
  int32_t elems = 0;

  taosThreadMutexLock(&shard->mutex);
  elems = shard->table.elems;
  taosThreadMutexUnlock(&shard->mutex);

  return elems;
}

591 592
static size_t taosLRUCacheShardGetPinnedUsage(SLRUCacheShard *shard) {
  size_t usage = 0;
M
Minglei Jin 已提交
593

594 595
  taosThreadMutexLock(&shard->mutex);

H
Haojun Liao 已提交
596
  ASSERT(shard->usage >= shard->lruUsage);
597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
  usage = shard->usage - shard->lruUsage;

  taosThreadMutexUnlock(&shard->mutex);

  return usage;
}

static void taosLRUCacheShardSetStrictCapacity(SLRUCacheShard *shard, bool strict) {
  taosThreadMutexLock(&shard->mutex);

  shard->strictCapacity = strict;

  taosThreadMutexUnlock(&shard->mutex);
}

struct SShardedCache {
M
Minglei Jin 已提交
613 614 615 616 617
  uint32_t      shardMask;
  TdThreadMutex capacityMutex;
  size_t        capacity;
  bool          strictCapacity;
  uint64_t      lastId;  // atomic var for last id
618 619 620 621 622 623 624 625 626
};

struct SLRUCache {
  SShardedCache   shardedCache;
  SLRUCacheShard *shards;
  int             numShards;
};

static int getDefaultCacheShardBits(size_t capacity) {
M
Minglei Jin 已提交
627
  int    numShardBits = 0;
628 629 630 631 632 633 634 635 636 637 638 639 640
  size_t minShardSize = 512 * 1024;
  size_t numShards = capacity / minShardSize;
  while (numShards >>= 1) {
    if (++numShardBits >= 6) {
      return numShardBits;
    }
  }

  return numShardBits;
}

SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoolRatio) {
  if (numShardBits >= 20) {
641
    terrno = TSDB_CODE_INVALID_PARA;
642 643 644
    return NULL;
  }
  if (highPriPoolRatio < 0.0 || highPriPoolRatio > 1.0) {
645
    terrno = TSDB_CODE_INVALID_PARA;
646 647 648 649
    return NULL;
  }
  SLRUCache *cache = taosMemoryCalloc(1, sizeof(SLRUCache));
  if (!cache) {
650
    terrno = TSDB_CODE_OUT_OF_MEMORY;
651 652 653 654 655 656 657
    return NULL;
  }

  if (numShardBits < 0) {
    numShardBits = getDefaultCacheShardBits(capacity);
  }

M
Minglei Jin 已提交
658
  int numShards = 1 << numShardBits;
659 660 661
  cache->shards = taosMemoryCalloc(numShards, sizeof(SLRUCacheShard));
  if (!cache->shards) {
    taosMemoryFree(cache);
662
    terrno = TSDB_CODE_OUT_OF_MEMORY;
663 664 665
    return NULL;
  }

M
Minglei Jin 已提交
666
  bool   strictCapacity = 1;
667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687
  size_t perShard = (capacity + (numShards - 1)) / numShards;
  for (int i = 0; i < numShards; ++i) {
    taosLRUCacheShardInit(&cache->shards[i], perShard, strictCapacity, highPriPoolRatio, 32 - numShardBits);
  }

  cache->numShards = numShards;

  cache->shardedCache.shardMask = (1 << numShardBits) - 1;
  cache->shardedCache.strictCapacity = strictCapacity;
  cache->shardedCache.capacity = capacity;
  cache->shardedCache.lastId = 1;

  taosThreadMutexInit(&cache->shardedCache.capacityMutex, NULL);

  return cache;
}

void taosLRUCacheCleanup(SLRUCache *cache) {
  if (cache) {
    if (cache->shards) {
      int numShards = cache->numShards;
H
Haojun Liao 已提交
688
      ASSERT(numShards > 0);
689
      for (int i = 0; i < numShards; ++i) {
M
Minglei Jin 已提交
690
        taosLRUCacheShardCleanup(&cache->shards[i]);
691 692 693 694 695 696 697 698 699 700 701 702
      }
      taosMemoryFree(cache->shards);
      cache->shards = 0;
    }

    taosThreadMutexDestroy(&cache->shardedCache.capacityMutex);

    taosMemoryFree(cache);
  }
}

LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge,
M
Minglei Jin 已提交
703
                             _taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority) {
704 705 706
  uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen);
  uint32_t shardIndex = hash & cache->shardedCache.shardMask;

M
Minglei Jin 已提交
707 708
  return taosLRUCacheShardInsert(&cache->shards[shardIndex], key, keyLen, hash, value, charge, deleter, handle,
                                 priority);
709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736
}

LRUHandle *taosLRUCacheLookup(SLRUCache *cache, const void *key, size_t keyLen) {
  uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen);
  uint32_t shardIndex = hash & cache->shardedCache.shardMask;

  return taosLRUCacheShardLookup(&cache->shards[shardIndex], key, keyLen, hash);
}

void taosLRUCacheErase(SLRUCache *cache, const void *key, size_t keyLen) {
  uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen);
  uint32_t shardIndex = hash & cache->shardedCache.shardMask;

  return taosLRUCacheShardErase(&cache->shards[shardIndex], key, keyLen, hash);
}

void taosLRUCacheEraseUnrefEntries(SLRUCache *cache) {
  int numShards = cache->numShards;
  for (int i = 0; i < numShards; ++i) {
    taosLRUCacheShardEraseUnrefEntries(&cache->shards[i]);
  }
}

bool taosLRUCacheRef(SLRUCache *cache, LRUHandle *handle) {
  if (handle == NULL) {
    return false;
  }

M
Minglei Jin 已提交
737
  uint32_t hash = ((SLRUEntry *)handle)->hash;
738 739 740 741 742 743 744 745 746 747
  uint32_t shardIndex = hash & cache->shardedCache.shardMask;

  return taosLRUCacheShardRef(&cache->shards[shardIndex], handle);
}

bool taosLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef) {
  if (handle == NULL) {
    return false;
  }

M
Minglei Jin 已提交
748
  uint32_t hash = ((SLRUEntry *)handle)->hash;
749 750 751 752 753
  uint32_t shardIndex = hash & cache->shardedCache.shardMask;

  return taosLRUCacheShardRelease(&cache->shards[shardIndex], handle, eraseIfLastRef);
}

M
Minglei Jin 已提交
754
void *taosLRUCacheValue(SLRUCache *cache, LRUHandle *handle) { return ((SLRUEntry *)handle)->value; }
755 756 757 758 759 760 761 762 763 764 765

size_t taosLRUCacheGetUsage(SLRUCache *cache) {
  size_t usage = 0;

  for (int i = 0; i < cache->numShards; ++i) {
    usage += taosLRUCacheShardGetUsage(&cache->shards[i]);
  }

  return usage;
}

766 767 768 769 770 771 772 773 774 775
int32_t taosLRUCacheGetElems(SLRUCache *cache) {
  int32_t elems = 0;

  for (int i = 0; i < cache->numShards; ++i) {
    elems += taosLRUCacheShardGetElems(&cache->shards[i]);
  }

  return elems;
}

776 777 778 779 780 781 782 783 784 785 786 787
size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache) {
  size_t usage = 0;

  for (int i = 0; i < cache->numShards; ++i) {
    usage += taosLRUCacheShardGetPinnedUsage(&cache->shards[i]);
  }

  return usage;
}

void taosLRUCacheSetCapacity(SLRUCache *cache, size_t capacity) {
  uint32_t numShards = cache->numShards;
M
Minglei Jin 已提交
788
  size_t   perShard = (capacity + (numShards - 1)) / numShards;
789 790 791 792 793 794 795 796

  taosThreadMutexLock(&cache->shardedCache.capacityMutex);

  for (int i = 0; i < numShards; ++i) {
    taosLRUCacheShardSetCapacity(&cache->shards[i], perShard);
  }

  cache->shardedCache.capacity = capacity;
M
Minglei Jin 已提交
797

798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822
  taosThreadMutexUnlock(&cache->shardedCache.capacityMutex);
}

size_t taosLRUCacheGetCapacity(SLRUCache *cache) {
  size_t capacity = 0;

  taosThreadMutexLock(&cache->shardedCache.capacityMutex);

  capacity = cache->shardedCache.capacity;

  taosThreadMutexUnlock(&cache->shardedCache.capacityMutex);

  return capacity;
}

void taosLRUCacheSetStrictCapacity(SLRUCache *cache, bool strict) {
  uint32_t numShards = cache->numShards;

  taosThreadMutexLock(&cache->shardedCache.capacityMutex);

  for (int i = 0; i < numShards; ++i) {
    taosLRUCacheShardSetStrictCapacity(&cache->shards[i], strict);
  }

  cache->shardedCache.strictCapacity = strict;
M
Minglei Jin 已提交
823

824 825 826 827 828 829 830 831 832 833 834 835 836 837
  taosThreadMutexUnlock(&cache->shardedCache.capacityMutex);
}

bool taosLRUCacheIsStrictCapacity(SLRUCache *cache) {
  bool strict = false;

  taosThreadMutexLock(&cache->shardedCache.capacityMutex);

  strict = cache->shardedCache.strictCapacity;

  taosThreadMutexUnlock(&cache->shardedCache.capacityMutex);

  return strict;
}