metaCache.c 27.2 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#include "meta.h"

17
#define TAG_FILTER_RES_KEY_LEN  32
18 19
#define META_CACHE_BASE_BUCKET  1024
#define META_CACHE_STATS_BUCKET 16
H
Hongze Cheng 已提交
20 21 22 23 24 25 26 27 28 29

// (uid , suid) : child table
// (uid,     0) : normal table
// (suid, suid) : super table
typedef struct SMetaCacheEntry SMetaCacheEntry;
struct SMetaCacheEntry {
  SMetaCacheEntry* next;
  SMetaInfo        info;
};

30 31 32 33 34
typedef struct SMetaStbStatsEntry {
  struct SMetaStbStatsEntry* next;
  SMetaStbStats              info;
} SMetaStbStatsEntry;

35
typedef struct STagFilterResEntry {
X
Xiaoyu Wang 已提交
36
  SList    list;      // the linked list of md5 digest, extracted from the serialized tag query condition
37
  uint32_t hitTimes;  // queried times for current super table
38 39
} STagFilterResEntry;

H
Hongze Cheng 已提交
40
struct SMetaCache {
41 42 43 44 45 46
  // child, normal, super, table entry cache
  struct SEntryCache {
    int32_t           nEntry;
    int32_t           nBucket;
    SMetaCacheEntry** aBucket;
  } sEntryCache;
47 48 49 50 51 52 53 54 55

  // stable stats cache
  struct SStbStatsCache {
    int32_t              nEntry;
    int32_t              nBucket;
    SMetaStbStatsEntry** aBucket;
  } sStbStatsCache;

  // query cache
56
  struct STagFilterResCache {
H
Haojun Liao 已提交
57
    TdThreadMutex lock;
X
Xiaoyu Wang 已提交
58
    uint32_t      accTimes;
59 60
    SHashObj*     pTableEntry;
    SLRUCache*    pUidResCache;
61
  } sTagFilterResCache;
D
dapan1121 已提交
62 63 64 65 66 67 68

  struct STbGroupResCache {
    TdThreadMutex lock;
    uint32_t      accTimes;
    SHashObj*     pTableEntry;
    SLRUCache*    pResCache;
  } STbGroupResCache;
H
Hongze Cheng 已提交
69 70
};

71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
static void entryCacheClose(SMeta* pMeta) {
  if (pMeta->pCache) {
    // close entry cache
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sEntryCache.nBucket; iBucket++) {
      SMetaCacheEntry* pEntry = pMeta->pCache->sEntryCache.aBucket[iBucket];
      while (pEntry) {
        SMetaCacheEntry* tEntry = pEntry->next;
        taosMemoryFree(pEntry);
        pEntry = tEntry;
      }
    }
    taosMemoryFree(pMeta->pCache->sEntryCache.aBucket);
  }
}

static void statsCacheClose(SMeta* pMeta) {
  if (pMeta->pCache) {
    // close entry cache
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sStbStatsCache.nBucket; iBucket++) {
      SMetaStbStatsEntry* pEntry = pMeta->pCache->sStbStatsCache.aBucket[iBucket];
      while (pEntry) {
        SMetaStbStatsEntry* tEntry = pEntry->next;
        taosMemoryFree(pEntry);
        pEntry = tEntry;
      }
    }
    taosMemoryFree(pMeta->pCache->sStbStatsCache.aBucket);
  }
}

H
Haojun Liao 已提交
101 102 103 104 105 106
static void freeCacheEntryFp(void* param) {
  STagFilterResEntry** p = param;
  tdListEmpty(&(*p)->list);
  taosMemoryFreeClear(*p);
}

H
Hongze Cheng 已提交
107 108 109 110 111 112 113 114 115 116
int32_t metaCacheOpen(SMeta* pMeta) {
  int32_t     code = 0;
  SMetaCache* pCache = NULL;

  pCache = (SMetaCache*)taosMemoryMalloc(sizeof(SMetaCache));
  if (pCache == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

117 118 119 120 121 122
  // open entry cache
  pCache->sEntryCache.nEntry = 0;
  pCache->sEntryCache.nBucket = META_CACHE_BASE_BUCKET;
  pCache->sEntryCache.aBucket =
      (SMetaCacheEntry**)taosMemoryCalloc(pCache->sEntryCache.nBucket, sizeof(SMetaCacheEntry*));
  if (pCache->sEntryCache.aBucket == NULL) {
H
Hongze Cheng 已提交
123 124 125 126
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

127 128 129 130 131 132 133 134 135 136
  // open stats cache
  pCache->sStbStatsCache.nEntry = 0;
  pCache->sStbStatsCache.nBucket = META_CACHE_STATS_BUCKET;
  pCache->sStbStatsCache.aBucket =
      (SMetaStbStatsEntry**)taosMemoryCalloc(pCache->sStbStatsCache.nBucket, sizeof(SMetaStbStatsEntry*));
  if (pCache->sStbStatsCache.aBucket == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err2;
  }

dengyihao's avatar
dengyihao 已提交
137
  pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
H
Haojun Liao 已提交
138 139 140 141
  if (pCache->sTagFilterResCache.pUidResCache == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err2;
  }
H
Hongze Cheng 已提交
142

H
Haojun Liao 已提交
143
  pCache->sTagFilterResCache.accTimes = 0;
dengyihao's avatar
dengyihao 已提交
144 145
  pCache->sTagFilterResCache.pTableEntry =
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
H
Haojun Liao 已提交
146 147 148 149 150
  if (pCache->sTagFilterResCache.pTableEntry == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err2;
  }

H
Haojun Liao 已提交
151
  taosHashSetFreeFp(pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
H
Haojun Liao 已提交
152 153
  taosThreadMutexInit(&pCache->sTagFilterResCache.lock, NULL);

D
dapan1121 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
  pCache->STbGroupResCache.pResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
  if (pCache->STbGroupResCache.pResCache == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err2;
  }

  pCache->STbGroupResCache.accTimes = 0;
  pCache->STbGroupResCache.pTableEntry =
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
  if (pCache->STbGroupResCache.pTableEntry == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err2;
  }

  taosHashSetFreeFp(pCache->STbGroupResCache.pTableEntry, freeCacheEntryFp);
  taosThreadMutexInit(&pCache->STbGroupResCache.lock, NULL);

H
Haojun Liao 已提交
171
  pMeta->pCache = pCache;
H
Hongze Cheng 已提交
172 173
  return code;

174 175 176
_err2:
  entryCacheClose(pMeta);

H
Hongze Cheng 已提交
177
_err:
178
  taosMemoryFree(pCache);
S
Shengliang Guan 已提交
179
  metaError("vgId:%d, meta open cache failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
180 181 182 183 184
  return code;
}

void metaCacheClose(SMeta* pMeta) {
  if (pMeta->pCache) {
185 186
    entryCacheClose(pMeta);
    statsCacheClose(pMeta);
H
Haojun Liao 已提交
187 188

    taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
H
Haojun Liao 已提交
189
    taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
H
Haojun Liao 已提交
190
    taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
H
Haojun Liao 已提交
191

D
dapan1121 已提交
192 193 194 195
    taosLRUCacheCleanup(pMeta->pCache->STbGroupResCache.pResCache);
    taosThreadMutexDestroy(&pMeta->pCache->STbGroupResCache.lock);
    taosHashCleanup(pMeta->pCache->STbGroupResCache.pTableEntry);

H
Hongze Cheng 已提交
196 197 198 199 200 201 202 203 204 205
    taosMemoryFree(pMeta->pCache);
    pMeta->pCache = NULL;
  }
}

static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) {
  int32_t code = 0;
  int32_t nBucket;

  if (expand) {
206
    nBucket = pCache->sEntryCache.nBucket * 2;
H
Hongze Cheng 已提交
207
  } else {
208
    nBucket = pCache->sEntryCache.nBucket / 2;
H
Hongze Cheng 已提交
209 210 211 212 213 214 215 216 217
  }

  SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*));
  if (aBucket == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // rehash
218 219
  for (int32_t iBucket = 0; iBucket < pCache->sEntryCache.nBucket; iBucket++) {
    SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
H
Hongze Cheng 已提交
220 221 222 223 224 225 226 227 228 229 230 231

    while (pEntry) {
      SMetaCacheEntry* pTEntry = pEntry->next;

      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;

      pEntry = pTEntry;
    }
  }

  // final set
232 233 234
  taosMemoryFree(pCache->sEntryCache.aBucket);
  pCache->sEntryCache.nBucket = nBucket;
  pCache->sEntryCache.aBucket = aBucket;
H
Hongze Cheng 已提交
235 236 237 238 239 240 241 242

_exit:
  return code;
}

int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
  int32_t code = 0;

243
  // meta is wlocked for calling this func.
H
Hongze Cheng 已提交
244 245 246

  // search
  SMetaCache*       pCache = pMeta->pCache;
247 248
  int32_t           iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
H
Hongze Cheng 已提交
249 250 251 252 253
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
    ppEntry = &(*ppEntry)->next;
  }

  if (*ppEntry) {  // update
254 255 256 257
    if (pInfo->suid != (*ppEntry)->info.suid) {
      metaError("meta/cache: suid should be same as the one in cache.");
      return TSDB_CODE_FAILED;
    }
H
Hongze Cheng 已提交
258 259 260 261 262
    if (pInfo->version > (*ppEntry)->info.version) {
      (*ppEntry)->info.version = pInfo->version;
      (*ppEntry)->info.skmVer = pInfo->skmVer;
    }
  } else {  // insert
263
    if (pCache->sEntryCache.nEntry >= pCache->sEntryCache.nBucket) {
H
Hongze Cheng 已提交
264 265 266
      code = metaRehashCache(pCache, 1);
      if (code) goto _exit;

267
      iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
H
Hongze Cheng 已提交
268 269 270 271 272 273 274 275 276
    }

    SMetaCacheEntry* pEntryNew = (SMetaCacheEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
    if (pEntryNew == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }

    pEntryNew->info = *pInfo;
277 278 279
    pEntryNew->next = pCache->sEntryCache.aBucket[iBucket];
    pCache->sEntryCache.aBucket[iBucket] = pEntryNew;
    pCache->sEntryCache.nEntry++;
H
Hongze Cheng 已提交
280 281 282 283 284 285 286 287 288 289
  }

_exit:
  return code;
}

int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
  int32_t code = 0;

  SMetaCache*       pCache = pMeta->pCache;
290 291
  int32_t           iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
H
Hongze Cheng 已提交
292 293 294 295 296 297 298 299
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
    ppEntry = &(*ppEntry)->next;
  }

  SMetaCacheEntry* pEntry = *ppEntry;
  if (pEntry) {
    *ppEntry = pEntry->next;
    taosMemoryFree(pEntry);
300 301 302
    pCache->sEntryCache.nEntry--;
    if (pCache->sEntryCache.nEntry < pCache->sEntryCache.nBucket / 4 &&
        pCache->sEntryCache.nBucket > META_CACHE_BASE_BUCKET) {
H
Hongze Cheng 已提交
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
      code = metaRehashCache(pCache, 0);
      if (code) goto _exit;
    }
  } else {
    code = TSDB_CODE_NOT_FOUND;
  }

_exit:
  return code;
}

int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) {
  int32_t code = 0;

  SMetaCache*      pCache = pMeta->pCache;
318 319
  int32_t          iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
  SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
H
Hongze Cheng 已提交
320 321 322 323 324 325 326 327 328 329 330 331 332

  while (pEntry && pEntry->info.uid != uid) {
    pEntry = pEntry->next;
  }

  if (pEntry) {
    *pInfo = pEntry->info;
  } else {
    code = TSDB_CODE_NOT_FOUND;
  }

  return code;
}
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375

static int32_t metaRehashStatsCache(SMetaCache* pCache, int8_t expand) {
  int32_t code = 0;
  int32_t nBucket;

  if (expand) {
    nBucket = pCache->sStbStatsCache.nBucket * 2;
  } else {
    nBucket = pCache->sStbStatsCache.nBucket / 2;
  }

  SMetaStbStatsEntry** aBucket = (SMetaStbStatsEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaStbStatsEntry*));
  if (aBucket == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // rehash
  for (int32_t iBucket = 0; iBucket < pCache->sStbStatsCache.nBucket; iBucket++) {
    SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];

    while (pEntry) {
      SMetaStbStatsEntry* pTEntry = pEntry->next;

      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;

      pEntry = pTEntry;
    }
  }

  // final set
  taosMemoryFree(pCache->sStbStatsCache.aBucket);
  pCache->sStbStatsCache.nBucket = nBucket;
  pCache->sStbStatsCache.aBucket = aBucket;

_exit:
  return code;
}

int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo) {
  int32_t code = 0;

376
  // meta is wlocked for calling this func.
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458

  // search
  SMetaCache*          pCache = pMeta->pCache;
  int32_t              iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
    ppEntry = &(*ppEntry)->next;
  }

  if (*ppEntry) {  // update
    (*ppEntry)->info.ctbNum = pInfo->ctbNum;
  } else {  // insert
    if (pCache->sStbStatsCache.nEntry >= pCache->sStbStatsCache.nBucket) {
      code = metaRehashStatsCache(pCache, 1);
      if (code) goto _exit;

      iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
    }

    SMetaStbStatsEntry* pEntryNew = (SMetaStbStatsEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
    if (pEntryNew == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }

    pEntryNew->info = *pInfo;
    pEntryNew->next = pCache->sStbStatsCache.aBucket[iBucket];
    pCache->sStbStatsCache.aBucket[iBucket] = pEntryNew;
    pCache->sStbStatsCache.nEntry++;
  }

_exit:
  return code;
}

int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid) {
  int32_t code = 0;

  SMetaCache*          pCache = pMeta->pCache;
  int32_t              iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
    ppEntry = &(*ppEntry)->next;
  }

  SMetaStbStatsEntry* pEntry = *ppEntry;
  if (pEntry) {
    *ppEntry = pEntry->next;
    taosMemoryFree(pEntry);
    pCache->sStbStatsCache.nEntry--;
    if (pCache->sStbStatsCache.nEntry < pCache->sStbStatsCache.nBucket / 4 &&
        pCache->sStbStatsCache.nBucket > META_CACHE_STATS_BUCKET) {
      code = metaRehashStatsCache(pCache, 0);
      if (code) goto _exit;
    }
  } else {
    code = TSDB_CODE_NOT_FOUND;
  }

_exit:
  return code;
}

int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

  SMetaCache*         pCache = pMeta->pCache;
  int32_t             iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
  SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];

  while (pEntry && pEntry->info.uid != uid) {
    pEntry = pEntry->next;
  }

  if (pEntry) {
    *pInfo = pEntry->info;
  } else {
    code = TSDB_CODE_NOT_FOUND;
  }

  return code;
}
459

X
Xiaoyu Wang 已提交
460 461
static int checkAllEntriesInCache(const STagFilterResEntry* pEntry, SArray* pInvalidRes, int32_t keyLen,
                                  SLRUCache* pCache, uint64_t suid) {
462 463 464 465
  SListIter iter = {0};
  tdListInitIter((SList*)&(pEntry->list), &iter, TD_LIST_FORWARD);

  SListNode* pNode = NULL;
X
Xiaoyu Wang 已提交
466
  uint64_t   buf[3];
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485
  buf[0] = suid;

  int32_t len = sizeof(uint64_t) * tListLen(buf);

  while ((pNode = tdListNext(&iter)) != NULL) {
    memcpy(&buf[1], pNode->data, keyLen);

    // check whether it is existed in LRU cache, and remove it from linked list if not.
    LRUHandle* pRes = taosLRUCacheLookup(pCache, buf, len);
    if (pRes == NULL) {  // remove the item in the linked list
      taosArrayPush(pInvalidRes, &pNode);
    } else {
      taosLRUCacheRelease(pCache, pRes, false);
    }
  }

  return 0;
}

486
static FORCE_INLINE void setMD5DigestInKey(uint64_t* pBuf, const char* key, int32_t keyLen) {
D
dapan1121 已提交
487
  //  ASSERT(keyLen == sizeof(int64_t) * 2);
488 489 490 491 492 493
  memcpy(&pBuf[2], key, keyLen);
}

// the format of key:
// hash table address(8bytes) + suid(8bytes) + MD5 digest(16bytes)
static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid, const char* key, int32_t keyLen) {
D
dapan1121 已提交
494
  buf[0] = (uint64_t)pHashMap;
495 496 497 498 499
  buf[1] = suid;
  setMD5DigestInKey(buf, key, keyLen);
  ASSERT(keyLen == sizeof(uint64_t) * 2);
}

500
int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
dengyihao's avatar
dengyihao 已提交
501
                                  bool* acquireRes) {
D
dapan1121 已提交
502
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
503 504
  int32_t vgId = TD_VID(pMeta->pVnode);

505
  // generate the composed key for LRU cache
H
Haojun Liao 已提交
506 507 508
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
  SHashObj*      pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
H
Haojun Liao 已提交
509 510

  *acquireRes = 0;
511
  uint64_t key[4];
512
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
513

H
Haojun Liao 已提交
514
  taosThreadMutexLock(pLock);
515
  pMeta->pCache->sTagFilterResCache.accTimes += 1;
H
Haojun Liao 已提交
516

517
  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
H
Haojun Liao 已提交
518
  if (pHandle == NULL) {
H
Haojun Liao 已提交
519
    taosThreadMutexUnlock(pLock);
520
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
521
  }
522

H
Haojun Liao 已提交
523 524
  // do some book mark work after acquiring the filter result from cache
  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
525 526 527 528 529
  if (NULL == pEntry) {
    metaError("meta/cache: pEntry should not be NULL.");
    return TSDB_CODE_FAILED;
  }

H
Haojun Liao 已提交
530 531 532 533 534 535 536 537
  *acquireRes = 1;

  const char* p = taosLRUCacheValue(pCache, pHandle);
  int32_t     size = *(int32_t*)p;

  // set the result into the buffer
  taosArrayAddBatch(pList1, p + sizeof(int32_t), size);

538 539
  (*pEntry)->hitTimes += 1;

H
Haojun Liao 已提交
540
  uint32_t acc = pMeta->pCache->sTagFilterResCache.accTimes;
H
Haojun Liao 已提交
541
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
D
dapan1121 已提交
542 543
    metaInfo("vgId:%d cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
             ((double)(*pEntry)->hitTimes) / acc);
544 545
  }

H
Haojun Liao 已提交
546 547 548
  taosLRUCacheRelease(pCache, pHandle, false);

  // unlock meta
H
Haojun Liao 已提交
549
  taosThreadMutexUnlock(pLock);
550 551 552
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
553 554
static void freeUidCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
  (void)ud;
H
Haojun Liao 已提交
555 556 557
  if (value == NULL) {
    return;
  }
558 559 560

  const uint64_t* p = key;
  if (keyLen != sizeof(int64_t) * 4) {
561
    metaError("key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
562 563
    return;
  }
564

565 566
  SHashObj* pHashObj = (SHashObj*)p[0];

567
  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
dengyihao's avatar
dengyihao 已提交
568

569
  if (pEntry != NULL && (*pEntry) != NULL) {
570
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
571 572

    SListIter iter = {0};
573
    tdListInitIter((SList*)&((*pEntry)->list), &iter, TD_LIST_FORWARD);
H
Haojun Liao 已提交
574 575 576

    SListNode* pNode = NULL;
    while ((pNode = tdListNext(&iter)) != NULL) {
577 578
      uint64_t* digest = (uint64_t*)pNode->data;
      if (digest[0] == p[2] && digest[1] == p[3]) {
579 580
        void* tmp = tdListPopNode(&((*pEntry)->list), pNode);
        taosMemoryFree(tmp);
581

582 583 584
        double el = (taosGetTimestampUs() - st) / 1000.0;
        metaInfo("clear items in meta-cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)),
                 el);
585
        break;
H
Haojun Liao 已提交
586 587
      }
    }
588 589
  }

H
Haojun Liao 已提交
590
  taosMemoryFree(value);
591 592
}

593 594 595 596
static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyLen, uint64_t suid) {
  STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
  if (p == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
597
  }
598

599
  p->hitTimes = 0;
600 601 602 603
  tdListInit(&p->list, keyLen);
  taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
  tdListAppend(&p->list, pKey);
  return 0;
H
Haojun Liao 已提交
604 605
}

606
// check both the payload size and selectivity ratio
607
int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
dengyihao's avatar
dengyihao 已提交
608
                              int32_t payloadLen, double selectivityRatio) {
609
  int32_t code = 0;
D
dapan1121 已提交
610
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
611 612
  int32_t vgId = TD_VID(pMeta->pVnode);

H
Haojun Liao 已提交
613
  if (selectivityRatio > tsSelectivityRatio) {
H
Haojun Liao 已提交
614 615
    metaDebug("vgId:%d, suid:%" PRIu64
              " failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f",
616
              vgId, suid, selectivityRatio, tsSelectivityRatio);
H
Haojun Liao 已提交
617
    taosMemoryFree(pPayload);
H
Haojun Liao 已提交
618 619 620 621
    return TSDB_CODE_SUCCESS;
  }

  if (payloadLen > tsTagFilterResCacheSize) {
H
Haojun Liao 已提交
622 623
    metaDebug("vgId:%d, suid:%" PRIu64
              " failed to add to uid list cache, due to payload length %d greater than threshold %d",
624
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
H
Haojun Liao 已提交
625
    taosMemoryFree(pPayload);
H
Haojun Liao 已提交
626 627 628
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
629 630 631
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
  SHashObj*      pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
H
Haojun Liao 已提交
632

633 634
  uint64_t key[4] = {0};
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);
635

H
Haojun Liao 已提交
636
  taosThreadMutexLock(pLock);
H
Haojun Liao 已提交
637
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
H
Haojun Liao 已提交
638
  if (pEntry == NULL) {
639 640 641 642
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }
D
dapan1121 已提交
643
  } else {  // check if it exists or not
H
Haojun Liao 已提交
644 645 646
    size_t size = listNEles(&(*pEntry)->list);
    if (size == 0) {
      tdListAppend(&(*pEntry)->list, pKey);
647
    } else {
H
Haojun Liao 已提交
648
      SListNode* pNode = listHead(&(*pEntry)->list);
649
      uint64_t*  p = (uint64_t*)pNode->data;
H
Haojun Liao 已提交
650
      if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
651
        // we have already found the existed items, no need to added to cache anymore.
652 653
        taosThreadMutexUnlock(pLock);
        return TSDB_CODE_SUCCESS;
654
      } else {  // not equal, append it
H
Haojun Liao 已提交
655 656 657
        tdListAppend(&(*pEntry)->list, pKey);
      }
    }
658
  }
H
Haojun Liao 已提交
659 660

  // add to cache.
D
dapan1121 已提交
661
  taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL,
D
dapan1121 已提交
662
                     TAOS_LRU_PRIORITY_LOW, NULL);
X
Xiaoyu Wang 已提交
663
_end:
H
Haojun Liao 已提交
664
  taosThreadMutexUnlock(pLock);
665
  metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", vgId, suid,
dengyihao's avatar
dengyihao 已提交
666
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
H
Haojun Liao 已提交
667

668
  return code;
669 670 671 672
}

// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
673 674 675
  uint64_t  p[4] = {0};
  int32_t   vgId = TD_VID(pMeta->pVnode);
  SHashObj* pEntryHashMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
676

677
  uint64_t dummy[2] = {0};
D
dapan1121 已提交
678
  initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
H
Haojun Liao 已提交
679 680 681

  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
  taosThreadMutexLock(pLock);
682 683

  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
H
Haojun Liao 已提交
684
  if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
H
Haojun Liao 已提交
685
    taosThreadMutexUnlock(pLock);
686 687 688
    return TSDB_CODE_SUCCESS;
  }

689 690
  (*pEntry)->hitTimes = 0;

691
  SListIter iter = {0};
H
Haojun Liao 已提交
692
  tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
693 694 695

  SListNode* pNode = NULL;
  while ((pNode = tdListNext(&iter)) != NULL) {
696 697
    setMD5DigestInKey(p, pNode->data, 2 * sizeof(uint64_t));
    taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, TAG_FILTER_RES_KEY_LEN);
698 699
  }

H
Haojun Liao 已提交
700
  tdListEmpty(&(*pEntry)->list);
H
Haojun Liao 已提交
701
  taosThreadMutexUnlock(pLock);
702

D
dapan1121 已提交
703
  metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid);
704 705
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
706

707
int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) {
D
dapan1121 已提交
708
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
D
dapan1121 已提交
709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734
  int32_t vgId = TD_VID(pMeta->pVnode);

  // generate the composed key for LRU cache
  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
  SHashObj*      pTableMap = pMeta->pCache->STbGroupResCache.pTableEntry;
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;

  *pList = NULL;
  uint64_t key[4];
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);

  taosThreadMutexLock(pLock);
  pMeta->pCache->STbGroupResCache.accTimes += 1;

  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
  if (pHandle == NULL) {
    taosThreadMutexUnlock(pLock);
    return TSDB_CODE_SUCCESS;
  }

  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
  if (NULL == pEntry) {
    metaDebug("suid %" PRIu64 " not in tb group cache", suid);
    return TSDB_CODE_FAILED;
  }

D
dapan1121 已提交
735
  *pList = taosArrayDup(taosLRUCacheValue(pCache, pHandle), NULL);
D
dapan1121 已提交
736 737 738 739 740

  (*pEntry)->hitTimes += 1;

  uint32_t acc = pMeta->pCache->STbGroupResCache.accTimes;
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
D
dapan1121 已提交
741 742
    metaInfo("vgId:%d tb group cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
             ((double)(*pEntry)->hitTimes) / acc);
D
dapan1121 已提交
743 744 745 746 747 748 749 750 751
  }

  taosLRUCacheRelease(pCache, pHandle, false);

  // unlock meta
  taosThreadMutexUnlock(pLock);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
752 753
static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
  (void)ud;
D
dapan1121 已提交
754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781
  if (value == NULL) {
    return;
  }

  const uint64_t* p = key;
  if (keyLen != sizeof(int64_t) * 4) {
    metaError("tb group key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
    return;
  }

  SHashObj* pHashObj = (SHashObj*)p[0];

  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));

  if (pEntry != NULL && (*pEntry) != NULL) {
    int64_t st = taosGetTimestampUs();

    SListIter iter = {0};
    tdListInitIter((SList*)&((*pEntry)->list), &iter, TD_LIST_FORWARD);

    SListNode* pNode = NULL;
    while ((pNode = tdListNext(&iter)) != NULL) {
      uint64_t* digest = (uint64_t*)pNode->data;
      if (digest[0] == p[2] && digest[1] == p[3]) {
        void* tmp = tdListPopNode(&((*pEntry)->list), pNode);
        taosMemoryFree(tmp);

        double el = (taosGetTimestampUs() - st) / 1000.0;
D
dapan1121 已提交
782 783
        metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms",
                  listNEles(&((*pEntry)->list)), el);
D
dapan1121 已提交
784 785 786 787 788 789 790 791
        break;
      }
    }
  }

  taosArrayDestroy((SArray*)value);
}

792
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
D
dapan1121 已提交
793 794
                              int32_t payloadLen) {
  int32_t code = 0;
D
dapan1121 已提交
795
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
D
dapan1121 已提交
796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819
  int32_t vgId = TD_VID(pMeta->pVnode);

  if (payloadLen > tsTagFilterResCacheSize) {
    metaDebug("vgId:%d, suid:%" PRIu64
              " ignore to add to tb group cache, due to payload length %d greater than threshold %d",
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
    taosArrayDestroy((SArray*)pPayload);
    return TSDB_CODE_SUCCESS;
  }

  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
  SHashObj*      pTableEntry = pMeta->pCache->STbGroupResCache.pTableEntry;
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;

  uint64_t key[4] = {0};
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);

  taosThreadMutexLock(pLock);
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
  if (pEntry == NULL) {
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }
D
dapan1121 已提交
820
  } else {  // check if it exists or not
D
dapan1121 已提交
821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838
    size_t size = listNEles(&(*pEntry)->list);
    if (size == 0) {
      tdListAppend(&(*pEntry)->list, pKey);
    } else {
      SListNode* pNode = listHead(&(*pEntry)->list);
      uint64_t*  p = (uint64_t*)pNode->data;
      if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
        // we have already found the existed items, no need to added to cache anymore.
        taosThreadMutexUnlock(pLock);
        return TSDB_CODE_SUCCESS;
      } else {  // not equal, append it
        tdListAppend(&(*pEntry)->list, pKey);
      }
    }
  }

  // add to cache.
  taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL,
D
dapan1121 已提交
839
                     TAOS_LRU_PRIORITY_LOW, NULL);
D
dapan1121 已提交
840 841 842 843 844 845 846 847 848 849 850 851 852 853 854
_end:
  taosThreadMutexUnlock(pLock);
  metaDebug("vgId:%d, suid:%" PRIu64 " tb group added into cache, total:%d, tables:%d", vgId, suid,
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));

  return code;
}

// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
  uint64_t  p[4] = {0};
  int32_t   vgId = TD_VID(pMeta->pVnode);
  SHashObj* pEntryHashMap = pMeta->pCache->STbGroupResCache.pTableEntry;

  uint64_t dummy[2] = {0};
D
dapan1121 已提交
855
  initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
D
dapan1121 已提交
856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879

  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
  taosThreadMutexLock(pLock);

  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
  if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
    taosThreadMutexUnlock(pLock);
    return TSDB_CODE_SUCCESS;
  }

  (*pEntry)->hitTimes = 0;

  SListIter iter = {0};
  tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);

  SListNode* pNode = NULL;
  while ((pNode = tdListNext(&iter)) != NULL) {
    setMD5DigestInKey(p, pNode->data, 2 * sizeof(uint64_t));
    taosLRUCacheErase(pMeta->pCache->STbGroupResCache.pResCache, p, TAG_FILTER_RES_KEY_LEN);
  }

  tdListEmpty(&(*pEntry)->list);
  taosThreadMutexUnlock(pLock);

D
dapan1121 已提交
880
  metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid);
D
dapan1121 已提交
881 882
  return TSDB_CODE_SUCCESS;
}