未验证 提交 2668def4 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19076 from taosdata/feature/3_liaohj

fix(query): add lock for cache.
......@@ -54,6 +54,7 @@ struct SMetaCache {
// query cache
struct STagFilterResCache {
TdThreadMutex lock;
SHashObj* pTableEntry;
SLRUCache* pUidResCache;
uint64_t keyBuf[3];
......@@ -140,6 +141,8 @@ int32_t metaCacheOpen(SMeta* pMeta) {
}
taosHashSetFreeFp(pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
taosThreadMutexInit(&pCache->sTagFilterResCache.lock, NULL);
pMeta->pCache = pCache;
return code;
......@@ -159,6 +162,8 @@ void metaCacheClose(SMeta* pMeta) {
taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
taosMemoryFree(pMeta->pCache);
pMeta->pCache = NULL;
}
......@@ -422,63 +427,78 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
bool* acquireRes) {
uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf;
// generate the composed key for LRU cache
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf;
SHashObj* pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
uint32_t times = 0;
*acquireRes = 0;
pBuf[0] = suid;
memcpy(&pBuf[1], pKey, keyLen);
taosThreadMutexLock(pLock);
int32_t len = keyLen + sizeof(uint64_t);
LRUHandle* pHandle = taosLRUCacheLookup(pCache, pBuf, len);
if (pHandle == NULL) {
*acquireRes = 0;
taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
} else { // do some book mark work after acquiring the filter result from cache
STagFilterResEntry** pEntry = taosHashGet(pMeta->pCache->sTagFilterResCache.pTableEntry, &suid, sizeof(uint64_t));
ASSERT(pEntry != NULL);
*acquireRes = 1;
const char* p = taosLRUCacheValue(pMeta->pCache->sTagFilterResCache.pUidResCache, pHandle);
int32_t size = *(int32_t*)p;
taosArrayAddBatch(pList1, p + sizeof(int32_t), size);
(*pEntry)->qTimes += 1;
taosLRUCacheRelease(pCache, pHandle, false);
// check if scanning all items are necessary or not
if ((*pEntry)->qTimes >= 5000 && TD_DLIST_NELES(&(*pEntry)->list) > 10) {
SArray* pList = taosArrayInit(64, POINTER_BYTES);
SListIter iter = {0};
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
memcpy(&pBuf[1], pNode->data, keyLen);
// check whether it is existed in LRU cache, and remove it from linked list if not.
LRUHandle* pRes = taosLRUCacheLookup(pCache, pBuf, len);
if (pRes == NULL) { // remove the item in the linked list
taosArrayPush(pList, &pNode);
} else {
taosLRUCacheRelease(pCache, pRes, false);
}
}
}
// remove the keys, of which query uid lists have been replaced already.
size_t s = taosArrayGetSize(pList);
for (int32_t i = 0; i < s; ++i) {
SListNode** p1 = taosArrayGet(pList, i);
tdListPopNode(&(*pEntry)->list, *p1);
taosMemoryFree(*p1);
}
// do some book mark work after acquiring the filter result from cache
STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
ASSERT(pEntry != NULL);
*acquireRes = 1;
(*pEntry)->qTimes = 0; // reset the query times
const char* p = taosLRUCacheValue(pCache, pHandle);
int32_t size = *(int32_t*)p;
taosArrayDestroy(pList);
// set the result into the buffer
taosArrayAddBatch(pList1, p + sizeof(int32_t), size);
times = atomic_add_fetch_32(&(*pEntry)->qTimes, 1);
taosLRUCacheRelease(pCache, pHandle, false);
// unlock meta
taosThreadMutexUnlock(pLock);
// check if scanning all items are necessary or not
if (times >= 5000 && TD_DLIST_NELES(&(*pEntry)->list) > 10) {
taosThreadMutexLock(pLock);
SArray* pInvalidRes = taosArrayInit(64, POINTER_BYTES);
SListIter iter = {0};
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
memcpy(&pBuf[1], pNode->data, keyLen);
// check whether it is existed in LRU cache, and remove it from linked list if not.
LRUHandle* pRes = taosLRUCacheLookup(pCache, pBuf, len);
if (pRes == NULL) { // remove the item in the linked list
taosArrayPush(pInvalidRes, &pNode);
} else {
taosLRUCacheRelease(pCache, pRes, false);
}
}
// remove the keys, of which query uid lists have been replaced already.
size_t s = taosArrayGetSize(pInvalidRes);
for (int32_t i = 0; i < s; ++i) {
SListNode** p1 = taosArrayGet(pInvalidRes, i);
tdListPopNode(&(*pEntry)->list, *p1);
taosMemoryFree(*p1);
}
atomic_store_32(&(*pEntry)->qTimes, 0); // reset the query times
taosArrayDestroy(pInvalidRes);
taosThreadMutexUnlock(pLock);
}
return TSDB_CODE_SUCCESS;
......@@ -510,8 +530,11 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
return TSDB_CODE_SUCCESS;
}
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
taosThreadMutexLock(pLock);
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
if (pEntry == NULL) {
......@@ -533,6 +556,9 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
// add to cache.
taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL,
TAOS_LRU_PRIORITY_LOW);
taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), suid,
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
......@@ -541,15 +567,19 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
int32_t keyLen = sizeof(uint64_t) * 3;
uint64_t p[3] = {0};
p[0] = suid;
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
taosThreadMutexLock(pLock);
STagFilterResEntry** pEntry = taosHashGet(pMeta->pCache->sTagFilterResCache.pTableEntry, &suid, sizeof(uint64_t));
if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
}
int32_t keyLen = sizeof(uint64_t) * 3;
uint64_t p[3] = {0};
p[0] = suid;
SListIter iter = {0};
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
......@@ -562,5 +592,6 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
(*pEntry)->qTimes = 0;
tdListEmpty(&(*pEntry)->list);
taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
}
......@@ -3522,6 +3522,8 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
}
int32_t code = tRowMergerGetRow(&merge, pTSRow);
tRowMergerClear(&merge);
return code;
}
......
......@@ -621,11 +621,13 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
const SDiskbasedBufStatis* ps = &pBuf->statis;
#if 0
printf(
"Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f "
"Kb, %s\n",
pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
#endif
if (ps->loadPages > 0) {
printf(
......@@ -634,7 +636,7 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
} else {
printf("no page loaded\n");
//printf("no page loaded\n");
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册