metaCache.c 28.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#include "meta.h"

17
#define TAG_FILTER_RES_KEY_LEN  32
18 19
#define META_CACHE_BASE_BUCKET  1024
#define META_CACHE_STATS_BUCKET 16
H
Hongze Cheng 已提交
20 21 22 23 24 25 26 27 28 29

// (uid , suid) : child table
// (uid,     0) : normal table
// (suid, suid) : super table
typedef struct SMetaCacheEntry SMetaCacheEntry;
struct SMetaCacheEntry {
  SMetaCacheEntry* next;
  SMetaInfo        info;
};

30 31 32 33 34
typedef struct SMetaStbStatsEntry {
  struct SMetaStbStatsEntry* next;
  SMetaStbStats              info;
} SMetaStbStatsEntry;

35
typedef struct STagFilterResEntry {
X
Xiaoyu Wang 已提交
36
  SList    list;      // the linked list of md5 digest, extracted from the serialized tag query condition
37
  uint32_t hitTimes;  // queried times for current super table
38 39
} STagFilterResEntry;

H
Hongze Cheng 已提交
40
struct SMetaCache {
41 42 43 44 45 46
  // child, normal, super, table entry cache
  struct SEntryCache {
    int32_t           nEntry;
    int32_t           nBucket;
    SMetaCacheEntry** aBucket;
  } sEntryCache;
47 48 49 50 51 52 53 54 55

  // stable stats cache
  struct SStbStatsCache {
    int32_t              nEntry;
    int32_t              nBucket;
    SMetaStbStatsEntry** aBucket;
  } sStbStatsCache;

  // query cache
56
  struct STagFilterResCache {
H
Haojun Liao 已提交
57
    TdThreadMutex lock;
X
Xiaoyu Wang 已提交
58
    uint32_t      accTimes;
59 60
    SHashObj*     pTableEntry;
    SLRUCache*    pUidResCache;
61
  } sTagFilterResCache;
D
dapan1121 已提交
62 63 64 65 66 67 68

  struct STbGroupResCache {
    TdThreadMutex lock;
    uint32_t      accTimes;
    SHashObj*     pTableEntry;
    SLRUCache*    pResCache;
  } STbGroupResCache;
69 70

  struct STbFilterCache {
K
kailixu 已提交
71
    SHashObj* pStb;
72
  } STbFilterCache;
H
Hongze Cheng 已提交
73 74
};

75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
static void entryCacheClose(SMeta* pMeta) {
  if (pMeta->pCache) {
    // close entry cache
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sEntryCache.nBucket; iBucket++) {
      SMetaCacheEntry* pEntry = pMeta->pCache->sEntryCache.aBucket[iBucket];
      while (pEntry) {
        SMetaCacheEntry* tEntry = pEntry->next;
        taosMemoryFree(pEntry);
        pEntry = tEntry;
      }
    }
    taosMemoryFree(pMeta->pCache->sEntryCache.aBucket);
  }
}

static void statsCacheClose(SMeta* pMeta) {
  if (pMeta->pCache) {
    // close entry cache
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sStbStatsCache.nBucket; iBucket++) {
      SMetaStbStatsEntry* pEntry = pMeta->pCache->sStbStatsCache.aBucket[iBucket];
      while (pEntry) {
        SMetaStbStatsEntry* tEntry = pEntry->next;
        taosMemoryFree(pEntry);
        pEntry = tEntry;
      }
    }
    taosMemoryFree(pMeta->pCache->sStbStatsCache.aBucket);
  }
}

H
Haojun Liao 已提交
105 106 107 108 109 110
static void freeCacheEntryFp(void* param) {
  STagFilterResEntry** p = param;
  tdListEmpty(&(*p)->list);
  taosMemoryFreeClear(*p);
}

H
Hongze Cheng 已提交
111 112 113 114 115 116 117 118 119 120
int32_t metaCacheOpen(SMeta* pMeta) {
  int32_t     code = 0;
  SMetaCache* pCache = NULL;

  pCache = (SMetaCache*)taosMemoryMalloc(sizeof(SMetaCache));
  if (pCache == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

121 122 123 124 125 126
  // open entry cache
  pCache->sEntryCache.nEntry = 0;
  pCache->sEntryCache.nBucket = META_CACHE_BASE_BUCKET;
  pCache->sEntryCache.aBucket =
      (SMetaCacheEntry**)taosMemoryCalloc(pCache->sEntryCache.nBucket, sizeof(SMetaCacheEntry*));
  if (pCache->sEntryCache.aBucket == NULL) {
H
Hongze Cheng 已提交
127 128 129 130
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

131 132 133 134 135 136 137 138 139 140
  // open stats cache
  pCache->sStbStatsCache.nEntry = 0;
  pCache->sStbStatsCache.nBucket = META_CACHE_STATS_BUCKET;
  pCache->sStbStatsCache.aBucket =
      (SMetaStbStatsEntry**)taosMemoryCalloc(pCache->sStbStatsCache.nBucket, sizeof(SMetaStbStatsEntry*));
  if (pCache->sStbStatsCache.aBucket == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err2;
  }

dengyihao's avatar
dengyihao 已提交
141
  pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
H
Haojun Liao 已提交
142 143 144 145
  if (pCache->sTagFilterResCache.pUidResCache == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err2;
  }
H
Hongze Cheng 已提交
146

H
Haojun Liao 已提交
147
  pCache->sTagFilterResCache.accTimes = 0;
dengyihao's avatar
dengyihao 已提交
148 149
  pCache->sTagFilterResCache.pTableEntry =
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
H
Haojun Liao 已提交
150 151 152 153 154
  if (pCache->sTagFilterResCache.pTableEntry == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err2;
  }

H
Haojun Liao 已提交
155
  taosHashSetFreeFp(pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
H
Haojun Liao 已提交
156 157
  taosThreadMutexInit(&pCache->sTagFilterResCache.lock, NULL);

D
dapan1121 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
  pCache->STbGroupResCache.pResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
  if (pCache->STbGroupResCache.pResCache == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err2;
  }

  pCache->STbGroupResCache.accTimes = 0;
  pCache->STbGroupResCache.pTableEntry =
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
  if (pCache->STbGroupResCache.pTableEntry == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err2;
  }

  taosHashSetFreeFp(pCache->STbGroupResCache.pTableEntry, freeCacheEntryFp);
  taosThreadMutexInit(&pCache->STbGroupResCache.lock, NULL);

K
kailixu 已提交
175 176
  pCache->STbFilterCache.pStb = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pCache->STbFilterCache.pStb == NULL) {
177 178 179 180
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err2;
  }

H
Haojun Liao 已提交
181
  pMeta->pCache = pCache;
H
Hongze Cheng 已提交
182 183
  return code;

184 185 186
_err2:
  entryCacheClose(pMeta);

H
Hongze Cheng 已提交
187
_err:
188
  taosMemoryFree(pCache);
S
Shengliang Guan 已提交
189
  metaError("vgId:%d, meta open cache failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
190 191 192 193 194
  return code;
}

void metaCacheClose(SMeta* pMeta) {
  if (pMeta->pCache) {
195 196
    entryCacheClose(pMeta);
    statsCacheClose(pMeta);
H
Haojun Liao 已提交
197 198

    taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
H
Haojun Liao 已提交
199
    taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
H
Haojun Liao 已提交
200
    taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
H
Haojun Liao 已提交
201

D
dapan1121 已提交
202 203 204 205
    taosLRUCacheCleanup(pMeta->pCache->STbGroupResCache.pResCache);
    taosThreadMutexDestroy(&pMeta->pCache->STbGroupResCache.lock);
    taosHashCleanup(pMeta->pCache->STbGroupResCache.pTableEntry);

K
kailixu 已提交
206
    taosHashCleanup(pMeta->pCache->STbFilterCache.pStb);
207

H
Hongze Cheng 已提交
208 209 210 211 212 213 214 215 216 217
    taosMemoryFree(pMeta->pCache);
    pMeta->pCache = NULL;
  }
}

static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) {
  int32_t code = 0;
  int32_t nBucket;

  if (expand) {
218
    nBucket = pCache->sEntryCache.nBucket * 2;
H
Hongze Cheng 已提交
219
  } else {
220
    nBucket = pCache->sEntryCache.nBucket / 2;
H
Hongze Cheng 已提交
221 222 223 224 225 226 227 228 229
  }

  SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*));
  if (aBucket == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // rehash
230 231
  for (int32_t iBucket = 0; iBucket < pCache->sEntryCache.nBucket; iBucket++) {
    SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
H
Hongze Cheng 已提交
232 233 234 235 236 237 238 239 240 241 242 243

    while (pEntry) {
      SMetaCacheEntry* pTEntry = pEntry->next;

      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;

      pEntry = pTEntry;
    }
  }

  // final set
244 245 246
  taosMemoryFree(pCache->sEntryCache.aBucket);
  pCache->sEntryCache.nBucket = nBucket;
  pCache->sEntryCache.aBucket = aBucket;
H
Hongze Cheng 已提交
247 248 249 250 251 252 253 254

_exit:
  return code;
}

int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
  int32_t code = 0;

255
  // meta is wlocked for calling this func.
H
Hongze Cheng 已提交
256 257 258

  // search
  SMetaCache*       pCache = pMeta->pCache;
259 260
  int32_t           iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
H
Hongze Cheng 已提交
261 262 263 264 265
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
    ppEntry = &(*ppEntry)->next;
  }

  if (*ppEntry) {  // update
266 267 268 269
    if (pInfo->suid != (*ppEntry)->info.suid) {
      metaError("meta/cache: suid should be same as the one in cache.");
      return TSDB_CODE_FAILED;
    }
H
Hongze Cheng 已提交
270 271 272 273 274
    if (pInfo->version > (*ppEntry)->info.version) {
      (*ppEntry)->info.version = pInfo->version;
      (*ppEntry)->info.skmVer = pInfo->skmVer;
    }
  } else {  // insert
275
    if (pCache->sEntryCache.nEntry >= pCache->sEntryCache.nBucket) {
H
Hongze Cheng 已提交
276 277 278
      code = metaRehashCache(pCache, 1);
      if (code) goto _exit;

279
      iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
H
Hongze Cheng 已提交
280 281 282 283 284 285 286 287 288
    }

    SMetaCacheEntry* pEntryNew = (SMetaCacheEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
    if (pEntryNew == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }

    pEntryNew->info = *pInfo;
289 290 291
    pEntryNew->next = pCache->sEntryCache.aBucket[iBucket];
    pCache->sEntryCache.aBucket[iBucket] = pEntryNew;
    pCache->sEntryCache.nEntry++;
H
Hongze Cheng 已提交
292 293 294 295 296 297 298 299 300 301
  }

_exit:
  return code;
}

int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
  int32_t code = 0;

  SMetaCache*       pCache = pMeta->pCache;
302 303
  int32_t           iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
H
Hongze Cheng 已提交
304 305 306 307 308 309 310 311
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
    ppEntry = &(*ppEntry)->next;
  }

  SMetaCacheEntry* pEntry = *ppEntry;
  if (pEntry) {
    *ppEntry = pEntry->next;
    taosMemoryFree(pEntry);
312 313 314
    pCache->sEntryCache.nEntry--;
    if (pCache->sEntryCache.nEntry < pCache->sEntryCache.nBucket / 4 &&
        pCache->sEntryCache.nBucket > META_CACHE_BASE_BUCKET) {
H
Hongze Cheng 已提交
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
      code = metaRehashCache(pCache, 0);
      if (code) goto _exit;
    }
  } else {
    code = TSDB_CODE_NOT_FOUND;
  }

_exit:
  return code;
}

int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) {
  int32_t code = 0;

  SMetaCache*      pCache = pMeta->pCache;
330 331
  int32_t          iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
  SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
H
Hongze Cheng 已提交
332 333 334 335 336 337 338 339 340 341 342 343 344

  while (pEntry && pEntry->info.uid != uid) {
    pEntry = pEntry->next;
  }

  if (pEntry) {
    *pInfo = pEntry->info;
  } else {
    code = TSDB_CODE_NOT_FOUND;
  }

  return code;
}
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387

static int32_t metaRehashStatsCache(SMetaCache* pCache, int8_t expand) {
  int32_t code = 0;
  int32_t nBucket;

  if (expand) {
    nBucket = pCache->sStbStatsCache.nBucket * 2;
  } else {
    nBucket = pCache->sStbStatsCache.nBucket / 2;
  }

  SMetaStbStatsEntry** aBucket = (SMetaStbStatsEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaStbStatsEntry*));
  if (aBucket == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // rehash
  for (int32_t iBucket = 0; iBucket < pCache->sStbStatsCache.nBucket; iBucket++) {
    SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];

    while (pEntry) {
      SMetaStbStatsEntry* pTEntry = pEntry->next;

      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;

      pEntry = pTEntry;
    }
  }

  // final set
  taosMemoryFree(pCache->sStbStatsCache.aBucket);
  pCache->sStbStatsCache.nBucket = nBucket;
  pCache->sStbStatsCache.aBucket = aBucket;

_exit:
  return code;
}

int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo) {
  int32_t code = 0;

388
  // meta is wlocked for calling this func.
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470

  // search
  SMetaCache*          pCache = pMeta->pCache;
  int32_t              iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
    ppEntry = &(*ppEntry)->next;
  }

  if (*ppEntry) {  // update
    (*ppEntry)->info.ctbNum = pInfo->ctbNum;
  } else {  // insert
    if (pCache->sStbStatsCache.nEntry >= pCache->sStbStatsCache.nBucket) {
      code = metaRehashStatsCache(pCache, 1);
      if (code) goto _exit;

      iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
    }

    SMetaStbStatsEntry* pEntryNew = (SMetaStbStatsEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
    if (pEntryNew == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }

    pEntryNew->info = *pInfo;
    pEntryNew->next = pCache->sStbStatsCache.aBucket[iBucket];
    pCache->sStbStatsCache.aBucket[iBucket] = pEntryNew;
    pCache->sStbStatsCache.nEntry++;
  }

_exit:
  return code;
}

int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid) {
  int32_t code = 0;

  SMetaCache*          pCache = pMeta->pCache;
  int32_t              iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
    ppEntry = &(*ppEntry)->next;
  }

  SMetaStbStatsEntry* pEntry = *ppEntry;
  if (pEntry) {
    *ppEntry = pEntry->next;
    taosMemoryFree(pEntry);
    pCache->sStbStatsCache.nEntry--;
    if (pCache->sStbStatsCache.nEntry < pCache->sStbStatsCache.nBucket / 4 &&
        pCache->sStbStatsCache.nBucket > META_CACHE_STATS_BUCKET) {
      code = metaRehashStatsCache(pCache, 0);
      if (code) goto _exit;
    }
  } else {
    code = TSDB_CODE_NOT_FOUND;
  }

_exit:
  return code;
}

int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

  SMetaCache*         pCache = pMeta->pCache;
  int32_t             iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
  SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];

  while (pEntry && pEntry->info.uid != uid) {
    pEntry = pEntry->next;
  }

  if (pEntry) {
    *pInfo = pEntry->info;
  } else {
    code = TSDB_CODE_NOT_FOUND;
  }

  return code;
}
471

X
Xiaoyu Wang 已提交
472 473
static int checkAllEntriesInCache(const STagFilterResEntry* pEntry, SArray* pInvalidRes, int32_t keyLen,
                                  SLRUCache* pCache, uint64_t suid) {
474 475 476 477
  SListIter iter = {0};
  tdListInitIter((SList*)&(pEntry->list), &iter, TD_LIST_FORWARD);

  SListNode* pNode = NULL;
X
Xiaoyu Wang 已提交
478
  uint64_t   buf[3];
479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497
  buf[0] = suid;

  int32_t len = sizeof(uint64_t) * tListLen(buf);

  while ((pNode = tdListNext(&iter)) != NULL) {
    memcpy(&buf[1], pNode->data, keyLen);

    // check whether it is existed in LRU cache, and remove it from linked list if not.
    LRUHandle* pRes = taosLRUCacheLookup(pCache, buf, len);
    if (pRes == NULL) {  // remove the item in the linked list
      taosArrayPush(pInvalidRes, &pNode);
    } else {
      taosLRUCacheRelease(pCache, pRes, false);
    }
  }

  return 0;
}

498
static FORCE_INLINE void setMD5DigestInKey(uint64_t* pBuf, const char* key, int32_t keyLen) {
D
dapan1121 已提交
499
  //  ASSERT(keyLen == sizeof(int64_t) * 2);
500 501 502 503 504 505
  memcpy(&pBuf[2], key, keyLen);
}

// the format of key:
// hash table address(8bytes) + suid(8bytes) + MD5 digest(16bytes)
static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid, const char* key, int32_t keyLen) {
D
dapan1121 已提交
506
  buf[0] = (uint64_t)pHashMap;
507 508 509 510 511
  buf[1] = suid;
  setMD5DigestInKey(buf, key, keyLen);
  ASSERT(keyLen == sizeof(uint64_t) * 2);
}

512
int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
dengyihao's avatar
dengyihao 已提交
513
                                  bool* acquireRes) {
D
dapan1121 已提交
514
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
515 516
  int32_t vgId = TD_VID(pMeta->pVnode);

517
  // generate the composed key for LRU cache
H
Haojun Liao 已提交
518 519 520
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
  SHashObj*      pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
H
Haojun Liao 已提交
521 522

  *acquireRes = 0;
523
  uint64_t key[4];
524
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
525

H
Haojun Liao 已提交
526
  taosThreadMutexLock(pLock);
527
  pMeta->pCache->sTagFilterResCache.accTimes += 1;
H
Haojun Liao 已提交
528

529
  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
H
Haojun Liao 已提交
530
  if (pHandle == NULL) {
H
Haojun Liao 已提交
531
    taosThreadMutexUnlock(pLock);
532
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
533
  }
534

H
Haojun Liao 已提交
535 536
  // do some book mark work after acquiring the filter result from cache
  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
537 538 539 540 541
  if (NULL == pEntry) {
    metaError("meta/cache: pEntry should not be NULL.");
    return TSDB_CODE_FAILED;
  }

H
Haojun Liao 已提交
542 543 544 545 546 547 548 549
  *acquireRes = 1;

  const char* p = taosLRUCacheValue(pCache, pHandle);
  int32_t     size = *(int32_t*)p;

  // set the result into the buffer
  taosArrayAddBatch(pList1, p + sizeof(int32_t), size);

550 551
  (*pEntry)->hitTimes += 1;

H
Haojun Liao 已提交
552
  uint32_t acc = pMeta->pCache->sTagFilterResCache.accTimes;
H
Haojun Liao 已提交
553
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
D
dapan1121 已提交
554 555
    metaInfo("vgId:%d cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
             ((double)(*pEntry)->hitTimes) / acc);
556 557
  }

H
Haojun Liao 已提交
558 559 560
  taosLRUCacheRelease(pCache, pHandle, false);

  // unlock meta
H
Haojun Liao 已提交
561
  taosThreadMutexUnlock(pLock);
562 563 564
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
565 566
static void freeUidCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
  (void)ud;
H
Haojun Liao 已提交
567 568 569
  if (value == NULL) {
    return;
  }
570 571 572

  const uint64_t* p = key;
  if (keyLen != sizeof(int64_t) * 4) {
573
    metaError("key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
574 575
    return;
  }
576

577 578
  SHashObj* pHashObj = (SHashObj*)p[0];

579
  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
dengyihao's avatar
dengyihao 已提交
580

581
  if (pEntry != NULL && (*pEntry) != NULL) {
582
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
583 584

    SListIter iter = {0};
585
    tdListInitIter((SList*)&((*pEntry)->list), &iter, TD_LIST_FORWARD);
H
Haojun Liao 已提交
586 587 588

    SListNode* pNode = NULL;
    while ((pNode = tdListNext(&iter)) != NULL) {
589 590
      uint64_t* digest = (uint64_t*)pNode->data;
      if (digest[0] == p[2] && digest[1] == p[3]) {
591 592
        void* tmp = tdListPopNode(&((*pEntry)->list), pNode);
        taosMemoryFree(tmp);
593

594 595 596
        double el = (taosGetTimestampUs() - st) / 1000.0;
        metaInfo("clear items in meta-cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)),
                 el);
597
        break;
H
Haojun Liao 已提交
598 599
      }
    }
600 601
  }

H
Haojun Liao 已提交
602
  taosMemoryFree(value);
603 604
}

605 606 607 608
static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyLen, uint64_t suid) {
  STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
  if (p == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
609
  }
610

611
  p->hitTimes = 0;
612 613 614 615
  tdListInit(&p->list, keyLen);
  taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
  tdListAppend(&p->list, pKey);
  return 0;
H
Haojun Liao 已提交
616 617
}

618
// check both the payload size and selectivity ratio
619
int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
dengyihao's avatar
dengyihao 已提交
620
                              int32_t payloadLen, double selectivityRatio) {
621
  int32_t code = 0;
D
dapan1121 已提交
622
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
623 624
  int32_t vgId = TD_VID(pMeta->pVnode);

H
Haojun Liao 已提交
625
  if (selectivityRatio > tsSelectivityRatio) {
H
Haojun Liao 已提交
626 627
    metaDebug("vgId:%d, suid:%" PRIu64
              " failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f",
628
              vgId, suid, selectivityRatio, tsSelectivityRatio);
H
Haojun Liao 已提交
629
    taosMemoryFree(pPayload);
H
Haojun Liao 已提交
630 631 632 633
    return TSDB_CODE_SUCCESS;
  }

  if (payloadLen > tsTagFilterResCacheSize) {
H
Haojun Liao 已提交
634 635
    metaDebug("vgId:%d, suid:%" PRIu64
              " failed to add to uid list cache, due to payload length %d greater than threshold %d",
636
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
H
Haojun Liao 已提交
637
    taosMemoryFree(pPayload);
H
Haojun Liao 已提交
638 639 640
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
641 642 643
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
  SHashObj*      pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
H
Haojun Liao 已提交
644

645 646
  uint64_t key[4] = {0};
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);
647

H
Haojun Liao 已提交
648
  taosThreadMutexLock(pLock);
H
Haojun Liao 已提交
649
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
H
Haojun Liao 已提交
650
  if (pEntry == NULL) {
651 652 653 654
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }
D
dapan1121 已提交
655
  } else {  // check if it exists or not
H
Haojun Liao 已提交
656 657 658
    size_t size = listNEles(&(*pEntry)->list);
    if (size == 0) {
      tdListAppend(&(*pEntry)->list, pKey);
659
    } else {
H
Haojun Liao 已提交
660
      SListNode* pNode = listHead(&(*pEntry)->list);
661
      uint64_t*  p = (uint64_t*)pNode->data;
H
Haojun Liao 已提交
662
      if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
663
        // we have already found the existed items, no need to added to cache anymore.
664 665
        taosThreadMutexUnlock(pLock);
        return TSDB_CODE_SUCCESS;
666
      } else {  // not equal, append it
H
Haojun Liao 已提交
667 668 669
        tdListAppend(&(*pEntry)->list, pKey);
      }
    }
670
  }
H
Haojun Liao 已提交
671 672

  // add to cache.
D
dapan1121 已提交
673
  taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL,
D
dapan1121 已提交
674
                     TAOS_LRU_PRIORITY_LOW, NULL);
X
Xiaoyu Wang 已提交
675
_end:
H
Haojun Liao 已提交
676
  taosThreadMutexUnlock(pLock);
677
  metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", vgId, suid,
dengyihao's avatar
dengyihao 已提交
678
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
H
Haojun Liao 已提交
679

680
  return code;
681 682 683 684
}

// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
685 686 687
  uint64_t  p[4] = {0};
  int32_t   vgId = TD_VID(pMeta->pVnode);
  SHashObj* pEntryHashMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
688

689
  uint64_t dummy[2] = {0};
D
dapan1121 已提交
690
  initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
H
Haojun Liao 已提交
691 692 693

  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
  taosThreadMutexLock(pLock);
694 695

  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
H
Haojun Liao 已提交
696
  if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
H
Haojun Liao 已提交
697
    taosThreadMutexUnlock(pLock);
698 699 700
    return TSDB_CODE_SUCCESS;
  }

701 702
  (*pEntry)->hitTimes = 0;

703
  SListIter iter = {0};
H
Haojun Liao 已提交
704
  tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
705 706 707

  SListNode* pNode = NULL;
  while ((pNode = tdListNext(&iter)) != NULL) {
708 709
    setMD5DigestInKey(p, pNode->data, 2 * sizeof(uint64_t));
    taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, TAG_FILTER_RES_KEY_LEN);
710 711
  }

H
Haojun Liao 已提交
712
  tdListEmpty(&(*pEntry)->list);
H
Haojun Liao 已提交
713
  taosThreadMutexUnlock(pLock);
714

D
dapan1121 已提交
715
  metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid);
716 717
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
718

719
int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) {
D
dapan1121 已提交
720
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
D
dapan1121 已提交
721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746
  int32_t vgId = TD_VID(pMeta->pVnode);

  // generate the composed key for LRU cache
  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
  SHashObj*      pTableMap = pMeta->pCache->STbGroupResCache.pTableEntry;
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;

  *pList = NULL;
  uint64_t key[4];
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);

  taosThreadMutexLock(pLock);
  pMeta->pCache->STbGroupResCache.accTimes += 1;

  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
  if (pHandle == NULL) {
    taosThreadMutexUnlock(pLock);
    return TSDB_CODE_SUCCESS;
  }

  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
  if (NULL == pEntry) {
    metaDebug("suid %" PRIu64 " not in tb group cache", suid);
    return TSDB_CODE_FAILED;
  }

D
dapan1121 已提交
747
  *pList = taosArrayDup(taosLRUCacheValue(pCache, pHandle), NULL);
D
dapan1121 已提交
748 749 750 751 752

  (*pEntry)->hitTimes += 1;

  uint32_t acc = pMeta->pCache->STbGroupResCache.accTimes;
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
D
dapan1121 已提交
753 754
    metaInfo("vgId:%d tb group cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
             ((double)(*pEntry)->hitTimes) / acc);
D
dapan1121 已提交
755 756 757 758 759 760 761 762 763
  }

  taosLRUCacheRelease(pCache, pHandle, false);

  // unlock meta
  taosThreadMutexUnlock(pLock);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
764 765
static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
  (void)ud;
D
dapan1121 已提交
766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793
  if (value == NULL) {
    return;
  }

  const uint64_t* p = key;
  if (keyLen != sizeof(int64_t) * 4) {
    metaError("tb group key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
    return;
  }

  SHashObj* pHashObj = (SHashObj*)p[0];

  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));

  if (pEntry != NULL && (*pEntry) != NULL) {
    int64_t st = taosGetTimestampUs();

    SListIter iter = {0};
    tdListInitIter((SList*)&((*pEntry)->list), &iter, TD_LIST_FORWARD);

    SListNode* pNode = NULL;
    while ((pNode = tdListNext(&iter)) != NULL) {
      uint64_t* digest = (uint64_t*)pNode->data;
      if (digest[0] == p[2] && digest[1] == p[3]) {
        void* tmp = tdListPopNode(&((*pEntry)->list), pNode);
        taosMemoryFree(tmp);

        double el = (taosGetTimestampUs() - st) / 1000.0;
D
dapan1121 已提交
794 795
        metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms",
                  listNEles(&((*pEntry)->list)), el);
D
dapan1121 已提交
796 797 798 799 800 801 802 803
        break;
      }
    }
  }

  taosArrayDestroy((SArray*)value);
}

804
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
D
dapan1121 已提交
805 806
                              int32_t payloadLen) {
  int32_t code = 0;
D
dapan1121 已提交
807
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
D
dapan1121 已提交
808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831
  int32_t vgId = TD_VID(pMeta->pVnode);

  if (payloadLen > tsTagFilterResCacheSize) {
    metaDebug("vgId:%d, suid:%" PRIu64
              " ignore to add to tb group cache, due to payload length %d greater than threshold %d",
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
    taosArrayDestroy((SArray*)pPayload);
    return TSDB_CODE_SUCCESS;
  }

  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
  SHashObj*      pTableEntry = pMeta->pCache->STbGroupResCache.pTableEntry;
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;

  uint64_t key[4] = {0};
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);

  taosThreadMutexLock(pLock);
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
  if (pEntry == NULL) {
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }
D
dapan1121 已提交
832
  } else {  // check if it exists or not
D
dapan1121 已提交
833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850
    size_t size = listNEles(&(*pEntry)->list);
    if (size == 0) {
      tdListAppend(&(*pEntry)->list, pKey);
    } else {
      SListNode* pNode = listHead(&(*pEntry)->list);
      uint64_t*  p = (uint64_t*)pNode->data;
      if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
        // we have already found the existed items, no need to added to cache anymore.
        taosThreadMutexUnlock(pLock);
        return TSDB_CODE_SUCCESS;
      } else {  // not equal, append it
        tdListAppend(&(*pEntry)->list, pKey);
      }
    }
  }

  // add to cache.
  taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL,
D
dapan1121 已提交
851
                     TAOS_LRU_PRIORITY_LOW, NULL);
D
dapan1121 已提交
852 853 854 855 856 857 858 859 860 861 862 863 864 865 866
_end:
  taosThreadMutexUnlock(pLock);
  metaDebug("vgId:%d, suid:%" PRIu64 " tb group added into cache, total:%d, tables:%d", vgId, suid,
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));

  return code;
}

// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
  uint64_t  p[4] = {0};
  int32_t   vgId = TD_VID(pMeta->pVnode);
  SHashObj* pEntryHashMap = pMeta->pCache->STbGroupResCache.pTableEntry;

  uint64_t dummy[2] = {0};
D
dapan1121 已提交
867
  initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
D
dapan1121 已提交
868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891

  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
  taosThreadMutexLock(pLock);

  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
  if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
    taosThreadMutexUnlock(pLock);
    return TSDB_CODE_SUCCESS;
  }

  (*pEntry)->hitTimes = 0;

  SListIter iter = {0};
  tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);

  SListNode* pNode = NULL;
  while ((pNode = tdListNext(&iter)) != NULL) {
    setMD5DigestInKey(p, pNode->data, 2 * sizeof(uint64_t));
    taosLRUCacheErase(pMeta->pCache->STbGroupResCache.pResCache, p, TAG_FILTER_RES_KEY_LEN);
  }

  tdListEmpty(&(*pEntry)->list);
  taosThreadMutexUnlock(pLock);

D
dapan1121 已提交
892
  metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid);
D
dapan1121 已提交
893 894
  return TSDB_CODE_SUCCESS;
}
895 896 897 898

bool metaTbInFilterCache(void* pVnode, tb_uid_t suid, int8_t type) {
  SMeta* pMeta = ((SVnode*)pVnode)->pMeta;

K
kailixu 已提交
899
  if (type == 0 && taosHashGet(pMeta->pCache->STbFilterCache.pStb, &suid, sizeof(suid))) {
900 901 902 903 904 905 906 907 908 909
    return true;
  }

  return false;
}

int32_t metaPutTbToFilterCache(void* pVnode, tb_uid_t suid, int8_t type) {
  SMeta* pMeta = ((SVnode*)pVnode)->pMeta;

  if (type == 0) {
K
kailixu 已提交
910
    return taosHashPut(pMeta->pCache->STbFilterCache.pStb, &suid, sizeof(suid), NULL, 0);
911 912 913 914 915 916 917 918
  }

  return 0;
}

int32_t metaSizeOfTbFilterCache(void* pVnode, int8_t type) {
  SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
  if (type == 0) {
K
kailixu 已提交
919
    return taosHashGetSize(pMeta->pCache->STbFilterCache.pStb);
920 921 922
  }
  return 0;
}