metaCache.c 27.1 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#include "meta.h"

17
#define TAG_FILTER_RES_KEY_LEN  32
18 19
#define META_CACHE_BASE_BUCKET  1024
#define META_CACHE_STATS_BUCKET 16
H
Hongze Cheng 已提交
20 21 22 23 24 25 26 27 28 29

// (uid , suid) : child table
// (uid,     0) : normal table
// (suid, suid) : super table
typedef struct SMetaCacheEntry SMetaCacheEntry;
struct SMetaCacheEntry {
  SMetaCacheEntry* next;
  SMetaInfo        info;
};

30 31 32 33 34
typedef struct SMetaStbStatsEntry {
  struct SMetaStbStatsEntry* next;
  SMetaStbStats              info;
} SMetaStbStatsEntry;

35
typedef struct STagFilterResEntry {
X
Xiaoyu Wang 已提交
36
  SList    list;      // the linked list of md5 digest, extracted from the serialized tag query condition
37
  uint32_t hitTimes;  // queried times for current super table
38 39
} STagFilterResEntry;

H
Hongze Cheng 已提交
40
struct SMetaCache {
41 42 43 44 45 46
  // child, normal, super, table entry cache
  struct SEntryCache {
    int32_t           nEntry;
    int32_t           nBucket;
    SMetaCacheEntry** aBucket;
  } sEntryCache;
47 48 49 50 51 52 53 54 55

  // stable stats cache
  struct SStbStatsCache {
    int32_t              nEntry;
    int32_t              nBucket;
    SMetaStbStatsEntry** aBucket;
  } sStbStatsCache;

  // query cache
56
  struct STagFilterResCache {
H
Haojun Liao 已提交
57
    TdThreadMutex lock;
X
Xiaoyu Wang 已提交
58
    uint32_t      accTimes;
59 60
    SHashObj*     pTableEntry;
    SLRUCache*    pUidResCache;
61
  } sTagFilterResCache;
D
dapan1121 已提交
62 63 64 65 66 67 68

  struct STbGroupResCache {
    TdThreadMutex lock;
    uint32_t      accTimes;
    SHashObj*     pTableEntry;
    SLRUCache*    pResCache;
  } STbGroupResCache;
H
Hongze Cheng 已提交
69 70
};

71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
static void entryCacheClose(SMeta* pMeta) {
  if (pMeta->pCache) {
    // close entry cache
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sEntryCache.nBucket; iBucket++) {
      SMetaCacheEntry* pEntry = pMeta->pCache->sEntryCache.aBucket[iBucket];
      while (pEntry) {
        SMetaCacheEntry* tEntry = pEntry->next;
        taosMemoryFree(pEntry);
        pEntry = tEntry;
      }
    }
    taosMemoryFree(pMeta->pCache->sEntryCache.aBucket);
  }
}

static void statsCacheClose(SMeta* pMeta) {
  if (pMeta->pCache) {
    // close entry cache
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sStbStatsCache.nBucket; iBucket++) {
      SMetaStbStatsEntry* pEntry = pMeta->pCache->sStbStatsCache.aBucket[iBucket];
      while (pEntry) {
        SMetaStbStatsEntry* tEntry = pEntry->next;
        taosMemoryFree(pEntry);
        pEntry = tEntry;
      }
    }
    taosMemoryFree(pMeta->pCache->sStbStatsCache.aBucket);
  }
}

H
Haojun Liao 已提交
101 102 103 104 105 106
static void freeCacheEntryFp(void* param) {
  STagFilterResEntry** p = param;
  tdListEmpty(&(*p)->list);
  taosMemoryFreeClear(*p);
}

H
Hongze Cheng 已提交
107 108 109 110 111 112 113 114 115 116
int32_t metaCacheOpen(SMeta* pMeta) {
  int32_t     code = 0;
  SMetaCache* pCache = NULL;

  pCache = (SMetaCache*)taosMemoryMalloc(sizeof(SMetaCache));
  if (pCache == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

117 118 119 120 121 122
  // open entry cache
  pCache->sEntryCache.nEntry = 0;
  pCache->sEntryCache.nBucket = META_CACHE_BASE_BUCKET;
  pCache->sEntryCache.aBucket =
      (SMetaCacheEntry**)taosMemoryCalloc(pCache->sEntryCache.nBucket, sizeof(SMetaCacheEntry*));
  if (pCache->sEntryCache.aBucket == NULL) {
H
Hongze Cheng 已提交
123 124 125 126
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

127 128 129 130 131 132 133 134 135 136
  // open stats cache
  pCache->sStbStatsCache.nEntry = 0;
  pCache->sStbStatsCache.nBucket = META_CACHE_STATS_BUCKET;
  pCache->sStbStatsCache.aBucket =
      (SMetaStbStatsEntry**)taosMemoryCalloc(pCache->sStbStatsCache.nBucket, sizeof(SMetaStbStatsEntry*));
  if (pCache->sStbStatsCache.aBucket == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err2;
  }

dengyihao's avatar
dengyihao 已提交
137
  pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
H
Haojun Liao 已提交
138 139 140 141
  if (pCache->sTagFilterResCache.pUidResCache == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err2;
  }
H
Hongze Cheng 已提交
142

H
Haojun Liao 已提交
143
  pCache->sTagFilterResCache.accTimes = 0;
dengyihao's avatar
dengyihao 已提交
144 145
  pCache->sTagFilterResCache.pTableEntry =
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
H
Haojun Liao 已提交
146 147 148 149 150
  if (pCache->sTagFilterResCache.pTableEntry == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err2;
  }

H
Haojun Liao 已提交
151
  taosHashSetFreeFp(pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
H
Haojun Liao 已提交
152 153
  taosThreadMutexInit(&pCache->sTagFilterResCache.lock, NULL);

D
dapan1121 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172

  pCache->STbGroupResCache.pResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
  if (pCache->STbGroupResCache.pResCache == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err2;
  }

  pCache->STbGroupResCache.accTimes = 0;
  pCache->STbGroupResCache.pTableEntry =
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
  if (pCache->STbGroupResCache.pTableEntry == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err2;
  }

  taosHashSetFreeFp(pCache->STbGroupResCache.pTableEntry, freeCacheEntryFp);
  taosThreadMutexInit(&pCache->STbGroupResCache.lock, NULL);


H
Haojun Liao 已提交
173
  pMeta->pCache = pCache;
H
Hongze Cheng 已提交
174 175
  return code;

176 177 178
_err2:
  entryCacheClose(pMeta);

H
Hongze Cheng 已提交
179
_err:
180
  taosMemoryFree(pCache);
S
Shengliang Guan 已提交
181
  metaError("vgId:%d, meta open cache failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
182 183 184 185 186
  return code;
}

void metaCacheClose(SMeta* pMeta) {
  if (pMeta->pCache) {
187 188
    entryCacheClose(pMeta);
    statsCacheClose(pMeta);
H
Haojun Liao 已提交
189 190

    taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
H
Haojun Liao 已提交
191
    taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
H
Haojun Liao 已提交
192
    taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
H
Haojun Liao 已提交
193

D
dapan1121 已提交
194 195 196 197
    taosLRUCacheCleanup(pMeta->pCache->STbGroupResCache.pResCache);
    taosThreadMutexDestroy(&pMeta->pCache->STbGroupResCache.lock);
    taosHashCleanup(pMeta->pCache->STbGroupResCache.pTableEntry);

H
Hongze Cheng 已提交
198 199 200 201 202 203 204 205 206 207
    taosMemoryFree(pMeta->pCache);
    pMeta->pCache = NULL;
  }
}

static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) {
  int32_t code = 0;
  int32_t nBucket;

  if (expand) {
208
    nBucket = pCache->sEntryCache.nBucket * 2;
H
Hongze Cheng 已提交
209
  } else {
210
    nBucket = pCache->sEntryCache.nBucket / 2;
H
Hongze Cheng 已提交
211 212 213 214 215 216 217 218 219
  }

  SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*));
  if (aBucket == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // rehash
220 221
  for (int32_t iBucket = 0; iBucket < pCache->sEntryCache.nBucket; iBucket++) {
    SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
H
Hongze Cheng 已提交
222 223 224 225 226 227 228 229 230 231 232 233

    while (pEntry) {
      SMetaCacheEntry* pTEntry = pEntry->next;

      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;

      pEntry = pTEntry;
    }
  }

  // final set
234 235 236
  taosMemoryFree(pCache->sEntryCache.aBucket);
  pCache->sEntryCache.nBucket = nBucket;
  pCache->sEntryCache.aBucket = aBucket;
H
Hongze Cheng 已提交
237 238 239 240 241 242 243 244

_exit:
  return code;
}

int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
  int32_t code = 0;

245
  // meta is wlocked for calling this func.
H
Hongze Cheng 已提交
246 247 248

  // search
  SMetaCache*       pCache = pMeta->pCache;
249 250
  int32_t           iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
H
Hongze Cheng 已提交
251 252 253 254 255
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
    ppEntry = &(*ppEntry)->next;
  }

  if (*ppEntry) {  // update
256 257 258 259
    if (pInfo->suid != (*ppEntry)->info.suid) {
      metaError("meta/cache: suid should be same as the one in cache.");
      return TSDB_CODE_FAILED;
    }
H
Hongze Cheng 已提交
260 261 262 263 264
    if (pInfo->version > (*ppEntry)->info.version) {
      (*ppEntry)->info.version = pInfo->version;
      (*ppEntry)->info.skmVer = pInfo->skmVer;
    }
  } else {  // insert
265
    if (pCache->sEntryCache.nEntry >= pCache->sEntryCache.nBucket) {
H
Hongze Cheng 已提交
266 267 268
      code = metaRehashCache(pCache, 1);
      if (code) goto _exit;

269
      iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
H
Hongze Cheng 已提交
270 271 272 273 274 275 276 277 278
    }

    SMetaCacheEntry* pEntryNew = (SMetaCacheEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
    if (pEntryNew == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }

    pEntryNew->info = *pInfo;
279 280 281
    pEntryNew->next = pCache->sEntryCache.aBucket[iBucket];
    pCache->sEntryCache.aBucket[iBucket] = pEntryNew;
    pCache->sEntryCache.nEntry++;
H
Hongze Cheng 已提交
282 283 284 285 286 287 288 289 290 291
  }

_exit:
  return code;
}

int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
  int32_t code = 0;

  SMetaCache*       pCache = pMeta->pCache;
292 293
  int32_t           iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
H
Hongze Cheng 已提交
294 295 296 297 298 299 300 301
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
    ppEntry = &(*ppEntry)->next;
  }

  SMetaCacheEntry* pEntry = *ppEntry;
  if (pEntry) {
    *ppEntry = pEntry->next;
    taosMemoryFree(pEntry);
302 303 304
    pCache->sEntryCache.nEntry--;
    if (pCache->sEntryCache.nEntry < pCache->sEntryCache.nBucket / 4 &&
        pCache->sEntryCache.nBucket > META_CACHE_BASE_BUCKET) {
H
Hongze Cheng 已提交
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
      code = metaRehashCache(pCache, 0);
      if (code) goto _exit;
    }
  } else {
    code = TSDB_CODE_NOT_FOUND;
  }

_exit:
  return code;
}

int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) {
  int32_t code = 0;

  SMetaCache*      pCache = pMeta->pCache;
320 321
  int32_t          iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
  SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
H
Hongze Cheng 已提交
322 323 324 325 326 327 328 329 330 331 332 333 334

  while (pEntry && pEntry->info.uid != uid) {
    pEntry = pEntry->next;
  }

  if (pEntry) {
    *pInfo = pEntry->info;
  } else {
    code = TSDB_CODE_NOT_FOUND;
  }

  return code;
}
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377

static int32_t metaRehashStatsCache(SMetaCache* pCache, int8_t expand) {
  int32_t code = 0;
  int32_t nBucket;

  if (expand) {
    nBucket = pCache->sStbStatsCache.nBucket * 2;
  } else {
    nBucket = pCache->sStbStatsCache.nBucket / 2;
  }

  SMetaStbStatsEntry** aBucket = (SMetaStbStatsEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaStbStatsEntry*));
  if (aBucket == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // rehash
  for (int32_t iBucket = 0; iBucket < pCache->sStbStatsCache.nBucket; iBucket++) {
    SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];

    while (pEntry) {
      SMetaStbStatsEntry* pTEntry = pEntry->next;

      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;

      pEntry = pTEntry;
    }
  }

  // final set
  taosMemoryFree(pCache->sStbStatsCache.aBucket);
  pCache->sStbStatsCache.nBucket = nBucket;
  pCache->sStbStatsCache.aBucket = aBucket;

_exit:
  return code;
}

int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo) {
  int32_t code = 0;

378
  // meta is wlocked for calling this func.
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460

  // search
  SMetaCache*          pCache = pMeta->pCache;
  int32_t              iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
    ppEntry = &(*ppEntry)->next;
  }

  if (*ppEntry) {  // update
    (*ppEntry)->info.ctbNum = pInfo->ctbNum;
  } else {  // insert
    if (pCache->sStbStatsCache.nEntry >= pCache->sStbStatsCache.nBucket) {
      code = metaRehashStatsCache(pCache, 1);
      if (code) goto _exit;

      iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
    }

    SMetaStbStatsEntry* pEntryNew = (SMetaStbStatsEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
    if (pEntryNew == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }

    pEntryNew->info = *pInfo;
    pEntryNew->next = pCache->sStbStatsCache.aBucket[iBucket];
    pCache->sStbStatsCache.aBucket[iBucket] = pEntryNew;
    pCache->sStbStatsCache.nEntry++;
  }

_exit:
  return code;
}

int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid) {
  int32_t code = 0;

  SMetaCache*          pCache = pMeta->pCache;
  int32_t              iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
    ppEntry = &(*ppEntry)->next;
  }

  SMetaStbStatsEntry* pEntry = *ppEntry;
  if (pEntry) {
    *ppEntry = pEntry->next;
    taosMemoryFree(pEntry);
    pCache->sStbStatsCache.nEntry--;
    if (pCache->sStbStatsCache.nEntry < pCache->sStbStatsCache.nBucket / 4 &&
        pCache->sStbStatsCache.nBucket > META_CACHE_STATS_BUCKET) {
      code = metaRehashStatsCache(pCache, 0);
      if (code) goto _exit;
    }
  } else {
    code = TSDB_CODE_NOT_FOUND;
  }

_exit:
  return code;
}

int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

  SMetaCache*         pCache = pMeta->pCache;
  int32_t             iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
  SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];

  while (pEntry && pEntry->info.uid != uid) {
    pEntry = pEntry->next;
  }

  if (pEntry) {
    *pInfo = pEntry->info;
  } else {
    code = TSDB_CODE_NOT_FOUND;
  }

  return code;
}
461

X
Xiaoyu Wang 已提交
462 463
static int checkAllEntriesInCache(const STagFilterResEntry* pEntry, SArray* pInvalidRes, int32_t keyLen,
                                  SLRUCache* pCache, uint64_t suid) {
464 465 466 467
  SListIter iter = {0};
  tdListInitIter((SList*)&(pEntry->list), &iter, TD_LIST_FORWARD);

  SListNode* pNode = NULL;
X
Xiaoyu Wang 已提交
468
  uint64_t   buf[3];
469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
  buf[0] = suid;

  int32_t len = sizeof(uint64_t) * tListLen(buf);

  while ((pNode = tdListNext(&iter)) != NULL) {
    memcpy(&buf[1], pNode->data, keyLen);

    // check whether it is existed in LRU cache, and remove it from linked list if not.
    LRUHandle* pRes = taosLRUCacheLookup(pCache, buf, len);
    if (pRes == NULL) {  // remove the item in the linked list
      taosArrayPush(pInvalidRes, &pNode);
    } else {
      taosLRUCacheRelease(pCache, pRes, false);
    }
  }

  return 0;
}

488 489 490 491 492 493 494 495 496 497 498 499 500 501
static FORCE_INLINE void setMD5DigestInKey(uint64_t* pBuf, const char* key, int32_t keyLen) {
//  ASSERT(keyLen == sizeof(int64_t) * 2);
  memcpy(&pBuf[2], key, keyLen);
}

// the format of key:
// hash table address(8bytes) + suid(8bytes) + MD5 digest(16bytes)
static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid, const char* key, int32_t keyLen) {
  buf[0] = (uint64_t) pHashMap;
  buf[1] = suid;
  setMD5DigestInKey(buf, key, keyLen);
  ASSERT(keyLen == sizeof(uint64_t) * 2);
}

502
int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
dengyihao's avatar
dengyihao 已提交
503
                                  bool* acquireRes) {
504
  SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
505 506
  int32_t vgId = TD_VID(pMeta->pVnode);

507
  // generate the composed key for LRU cache
H
Haojun Liao 已提交
508 509 510
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
  SHashObj*      pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
H
Haojun Liao 已提交
511 512

  *acquireRes = 0;
513
  uint64_t key[4];
514
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
515

H
Haojun Liao 已提交
516
  taosThreadMutexLock(pLock);
517
  pMeta->pCache->sTagFilterResCache.accTimes += 1;
H
Haojun Liao 已提交
518

519
  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
H
Haojun Liao 已提交
520
  if (pHandle == NULL) {
H
Haojun Liao 已提交
521
    taosThreadMutexUnlock(pLock);
522
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
523
  }
524

H
Haojun Liao 已提交
525 526
  // do some book mark work after acquiring the filter result from cache
  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
527 528 529 530 531
  if (NULL == pEntry) {
    metaError("meta/cache: pEntry should not be NULL.");
    return TSDB_CODE_FAILED;
  }

H
Haojun Liao 已提交
532 533 534 535 536 537 538 539
  *acquireRes = 1;

  const char* p = taosLRUCacheValue(pCache, pHandle);
  int32_t     size = *(int32_t*)p;

  // set the result into the buffer
  taosArrayAddBatch(pList1, p + sizeof(int32_t), size);

540 541
  (*pEntry)->hitTimes += 1;

H
Haojun Liao 已提交
542
  uint32_t acc = pMeta->pCache->sTagFilterResCache.accTimes;
H
Haojun Liao 已提交
543
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
544
    metaInfo("vgId:%d cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc, ((double)(*pEntry)->hitTimes) / acc);
545 546
  }

H
Haojun Liao 已提交
547 548 549
  taosLRUCacheRelease(pCache, pHandle, false);

  // unlock meta
H
Haojun Liao 已提交
550
  taosThreadMutexUnlock(pLock);
551 552 553
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
554
static void freeUidCachePayload(const void* key, size_t keyLen, void* value) {
H
Haojun Liao 已提交
555 556 557
  if (value == NULL) {
    return;
  }
558 559 560

  const uint64_t* p = key;
  if (keyLen != sizeof(int64_t) * 4) {
561
    metaError("key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
562 563
    return;
  }
564

565 566
  SHashObj* pHashObj = (SHashObj*)p[0];

567
  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
dengyihao's avatar
dengyihao 已提交
568

569
  if (pEntry != NULL && (*pEntry) != NULL) {
570
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
571 572

    SListIter iter = {0};
573
    tdListInitIter((SList*)&((*pEntry)->list), &iter, TD_LIST_FORWARD);
H
Haojun Liao 已提交
574 575 576

    SListNode* pNode = NULL;
    while ((pNode = tdListNext(&iter)) != NULL) {
577 578
      uint64_t* digest = (uint64_t*)pNode->data;
      if (digest[0] == p[2] && digest[1] == p[3]) {
579 580
        void* tmp = tdListPopNode(&((*pEntry)->list), pNode);
        taosMemoryFree(tmp);
581

582 583 584
        double el = (taosGetTimestampUs() - st) / 1000.0;
        metaInfo("clear items in meta-cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)),
                 el);
585
        break;
H
Haojun Liao 已提交
586 587
      }
    }
588 589
  }

H
Haojun Liao 已提交
590
  taosMemoryFree(value);
591 592
}

593 594 595 596
static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyLen, uint64_t suid) {
  STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
  if (p == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
597
  }
598

599
  p->hitTimes = 0;
600 601 602 603
  tdListInit(&p->list, keyLen);
  taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
  tdListAppend(&p->list, pKey);
  return 0;
H
Haojun Liao 已提交
604 605
}

606
// check both the payload size and selectivity ratio
607
int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
dengyihao's avatar
dengyihao 已提交
608
                              int32_t payloadLen, double selectivityRatio) {
609
  int32_t code = 0;
610
  SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
611 612
  int32_t vgId = TD_VID(pMeta->pVnode);

H
Haojun Liao 已提交
613
  if (selectivityRatio > tsSelectivityRatio) {
H
Haojun Liao 已提交
614 615
    metaDebug("vgId:%d, suid:%" PRIu64
              " failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f",
616
              vgId, suid, selectivityRatio, tsSelectivityRatio);
H
Haojun Liao 已提交
617
    taosMemoryFree(pPayload);
H
Haojun Liao 已提交
618 619 620 621
    return TSDB_CODE_SUCCESS;
  }

  if (payloadLen > tsTagFilterResCacheSize) {
H
Haojun Liao 已提交
622 623
    metaDebug("vgId:%d, suid:%" PRIu64
              " failed to add to uid list cache, due to payload length %d greater than threshold %d",
624
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
H
Haojun Liao 已提交
625
    taosMemoryFree(pPayload);
H
Haojun Liao 已提交
626 627 628
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
629 630 631
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
  SHashObj*      pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
H
Haojun Liao 已提交
632

633 634
  uint64_t key[4] = {0};
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);
635

H
Haojun Liao 已提交
636
  taosThreadMutexLock(pLock);
H
Haojun Liao 已提交
637
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
H
Haojun Liao 已提交
638
  if (pEntry == NULL) {
639 640 641 642
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }
643
  } else { // check if it exists or not
H
Haojun Liao 已提交
644 645 646
    size_t size = listNEles(&(*pEntry)->list);
    if (size == 0) {
      tdListAppend(&(*pEntry)->list, pKey);
647
    } else {
H
Haojun Liao 已提交
648
      SListNode* pNode = listHead(&(*pEntry)->list);
649
      uint64_t*  p = (uint64_t*)pNode->data;
H
Haojun Liao 已提交
650
      if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
651
        // we have already found the existed items, no need to added to cache anymore.
652 653
        taosThreadMutexUnlock(pLock);
        return TSDB_CODE_SUCCESS;
654
      } else {  // not equal, append it
H
Haojun Liao 已提交
655 656 657
        tdListAppend(&(*pEntry)->list, pKey);
      }
    }
658
  }
H
Haojun Liao 已提交
659 660

  // add to cache.
D
dapan1121 已提交
661
  taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL,
dengyihao's avatar
dengyihao 已提交
662
                     TAOS_LRU_PRIORITY_LOW);
X
Xiaoyu Wang 已提交
663
_end:
H
Haojun Liao 已提交
664
  taosThreadMutexUnlock(pLock);
665
  metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", vgId, suid,
dengyihao's avatar
dengyihao 已提交
666
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
H
Haojun Liao 已提交
667

668
  return code;
669 670 671 672
}

// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
673 674 675
  uint64_t  p[4] = {0};
  int32_t   vgId = TD_VID(pMeta->pVnode);
  SHashObj* pEntryHashMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
676

677 678
  uint64_t dummy[2] = {0};
  initCacheKey(p, pEntryHashMap, suid, (char*) &dummy[0], 16);
H
Haojun Liao 已提交
679 680 681

  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
  taosThreadMutexLock(pLock);
682 683

  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
H
Haojun Liao 已提交
684
  if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
H
Haojun Liao 已提交
685
    taosThreadMutexUnlock(pLock);
686 687 688
    return TSDB_CODE_SUCCESS;
  }

689 690
  (*pEntry)->hitTimes = 0;

691
  SListIter iter = {0};
H
Haojun Liao 已提交
692
  tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
693 694 695

  SListNode* pNode = NULL;
  while ((pNode = tdListNext(&iter)) != NULL) {
696 697
    setMD5DigestInKey(p, pNode->data, 2 * sizeof(uint64_t));
    taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, TAG_FILTER_RES_KEY_LEN);
698 699
  }

H
Haojun Liao 已提交
700
  tdListEmpty(&(*pEntry)->list);
H
Haojun Liao 已提交
701
  taosThreadMutexUnlock(pLock);
702 703

  metaDebug("vgId:%d suid:%"PRId64" cached related tag filter uid list cleared", vgId, suid);
704 705
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
706

707 708
int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) {
  SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
D
dapan1121 已提交
709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734
  int32_t vgId = TD_VID(pMeta->pVnode);

  // generate the composed key for LRU cache
  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
  SHashObj*      pTableMap = pMeta->pCache->STbGroupResCache.pTableEntry;
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;

  *pList = NULL;
  uint64_t key[4];
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);

  taosThreadMutexLock(pLock);
  pMeta->pCache->STbGroupResCache.accTimes += 1;

  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
  if (pHandle == NULL) {
    taosThreadMutexUnlock(pLock);
    return TSDB_CODE_SUCCESS;
  }

  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
  if (NULL == pEntry) {
    metaDebug("suid %" PRIu64 " not in tb group cache", suid);
    return TSDB_CODE_FAILED;
  }

D
dapan1121 已提交
735
  *pList = taosArrayDup(taosLRUCacheValue(pCache, pHandle), NULL);
D
dapan1121 已提交
736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791

  (*pEntry)->hitTimes += 1;

  uint32_t acc = pMeta->pCache->STbGroupResCache.accTimes;
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
    metaInfo("vgId:%d tb group cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc, ((double)(*pEntry)->hitTimes) / acc);
  }

  taosLRUCacheRelease(pCache, pHandle, false);

  // unlock meta
  taosThreadMutexUnlock(pLock);
  return TSDB_CODE_SUCCESS;
}


static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value) {
  if (value == NULL) {
    return;
  }

  const uint64_t* p = key;
  if (keyLen != sizeof(int64_t) * 4) {
    metaError("tb group key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
    return;
  }

  SHashObj* pHashObj = (SHashObj*)p[0];

  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));

  if (pEntry != NULL && (*pEntry) != NULL) {
    int64_t st = taosGetTimestampUs();

    SListIter iter = {0};
    tdListInitIter((SList*)&((*pEntry)->list), &iter, TD_LIST_FORWARD);

    SListNode* pNode = NULL;
    while ((pNode = tdListNext(&iter)) != NULL) {
      uint64_t* digest = (uint64_t*)pNode->data;
      if (digest[0] == p[2] && digest[1] == p[3]) {
        void* tmp = tdListPopNode(&((*pEntry)->list), pNode);
        taosMemoryFree(tmp);

        double el = (taosGetTimestampUs() - st) / 1000.0;
        metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)),
                 el);
        break;
      }
    }
  }

  taosArrayDestroy((SArray*)value);
}


792
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
D
dapan1121 已提交
793 794
                              int32_t payloadLen) {
  int32_t code = 0;
795
  SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
D
dapan1121 已提交
796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884
  int32_t vgId = TD_VID(pMeta->pVnode);

  if (payloadLen > tsTagFilterResCacheSize) {
    metaDebug("vgId:%d, suid:%" PRIu64
              " ignore to add to tb group cache, due to payload length %d greater than threshold %d",
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
    taosArrayDestroy((SArray*)pPayload);
    return TSDB_CODE_SUCCESS;
  }

  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
  SHashObj*      pTableEntry = pMeta->pCache->STbGroupResCache.pTableEntry;
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;

  uint64_t key[4] = {0};
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);

  taosThreadMutexLock(pLock);
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
  if (pEntry == NULL) {
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }
  } else { // check if it exists or not
    size_t size = listNEles(&(*pEntry)->list);
    if (size == 0) {
      tdListAppend(&(*pEntry)->list, pKey);
    } else {
      SListNode* pNode = listHead(&(*pEntry)->list);
      uint64_t*  p = (uint64_t*)pNode->data;
      if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
        // we have already found the existed items, no need to added to cache anymore.
        taosThreadMutexUnlock(pLock);
        return TSDB_CODE_SUCCESS;
      } else {  // not equal, append it
        tdListAppend(&(*pEntry)->list, pKey);
      }
    }
  }

  // add to cache.
  taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL,
                     TAOS_LRU_PRIORITY_LOW);
_end:
  taosThreadMutexUnlock(pLock);
  metaDebug("vgId:%d, suid:%" PRIu64 " tb group added into cache, total:%d, tables:%d", vgId, suid,
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));

  return code;
}

// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
  uint64_t  p[4] = {0};
  int32_t   vgId = TD_VID(pMeta->pVnode);
  SHashObj* pEntryHashMap = pMeta->pCache->STbGroupResCache.pTableEntry;

  uint64_t dummy[2] = {0};
  initCacheKey(p, pEntryHashMap, suid, (char*) &dummy[0], 16);

  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
  taosThreadMutexLock(pLock);

  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
  if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
    taosThreadMutexUnlock(pLock);
    return TSDB_CODE_SUCCESS;
  }

  (*pEntry)->hitTimes = 0;

  SListIter iter = {0};
  tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);

  SListNode* pNode = NULL;
  while ((pNode = tdListNext(&iter)) != NULL) {
    setMD5DigestInKey(p, pNode->data, 2 * sizeof(uint64_t));
    taosLRUCacheErase(pMeta->pCache->STbGroupResCache.pResCache, p, TAG_FILTER_RES_KEY_LEN);
  }

  tdListEmpty(&(*pEntry)->list);
  taosThreadMutexUnlock(pLock);

  metaDebug("vgId:%d suid:%"PRId64" cached related tb group cleared", vgId, suid);
  return TSDB_CODE_SUCCESS;
}