未验证 提交 c79fd325 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #17299 from taosdata/fix/TD-19332

fix: use stable stats cache to get ctbNum
...@@ -67,6 +67,10 @@ void metaCacheClose(SMeta* pMeta); ...@@ -67,6 +67,10 @@ void metaCacheClose(SMeta* pMeta);
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo); int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo);
int32_t metaCacheDrop(SMeta* pMeta, int64_t uid); int32_t metaCacheDrop(SMeta* pMeta, int64_t uid);
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo);
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid);
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
struct SMeta { struct SMeta {
TdThreadRwlock lock; TdThreadRwlock lock;
......
...@@ -142,6 +142,12 @@ typedef struct SMetaInfo { ...@@ -142,6 +142,12 @@ typedef struct SMetaInfo {
} SMetaInfo; } SMetaInfo;
int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo); int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo);
typedef struct {
int64_t uid;
int64_t ctbNum;
} SMetaStbStats;
int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
// tsdb // tsdb
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg);
int tsdbClose(STsdb** pTsdb); int tsdbClose(STsdb** pTsdb);
......
...@@ -14,7 +14,8 @@ ...@@ -14,7 +14,8 @@
*/ */
#include "meta.h" #include "meta.h"
#define META_CACHE_BASE_BUCKET 1024 #define META_CACHE_BASE_BUCKET 1024
#define META_CACHE_STATS_BUCKET 16
// (uid , suid) : child table // (uid , suid) : child table
// (uid, 0) : normal table // (uid, 0) : normal table
...@@ -25,12 +26,59 @@ struct SMetaCacheEntry { ...@@ -25,12 +26,59 @@ struct SMetaCacheEntry {
SMetaInfo info; SMetaInfo info;
}; };
typedef struct SMetaStbStatsEntry {
struct SMetaStbStatsEntry* next;
SMetaStbStats info;
} SMetaStbStatsEntry;
struct SMetaCache { struct SMetaCache {
int32_t nEntry; // child, normal, super, table entry cache
int32_t nBucket; struct SEntryCache {
SMetaCacheEntry** aBucket; int32_t nEntry;
int32_t nBucket;
SMetaCacheEntry** aBucket;
} sEntryCache;
// stable stats cache
struct SStbStatsCache {
int32_t nEntry;
int32_t nBucket;
SMetaStbStatsEntry** aBucket;
} sStbStatsCache;
// query cache
}; };
static void entryCacheClose(SMeta* pMeta) {
if (pMeta->pCache) {
// close entry cache
for (int32_t iBucket = 0; iBucket < pMeta->pCache->sEntryCache.nBucket; iBucket++) {
SMetaCacheEntry* pEntry = pMeta->pCache->sEntryCache.aBucket[iBucket];
while (pEntry) {
SMetaCacheEntry* tEntry = pEntry->next;
taosMemoryFree(pEntry);
pEntry = tEntry;
}
}
taosMemoryFree(pMeta->pCache->sEntryCache.aBucket);
}
}
static void statsCacheClose(SMeta* pMeta) {
if (pMeta->pCache) {
// close entry cache
for (int32_t iBucket = 0; iBucket < pMeta->pCache->sStbStatsCache.nBucket; iBucket++) {
SMetaStbStatsEntry* pEntry = pMeta->pCache->sStbStatsCache.aBucket[iBucket];
while (pEntry) {
SMetaStbStatsEntry* tEntry = pEntry->next;
taosMemoryFree(pEntry);
pEntry = tEntry;
}
}
taosMemoryFree(pMeta->pCache->sStbStatsCache.aBucket);
}
}
int32_t metaCacheOpen(SMeta* pMeta) { int32_t metaCacheOpen(SMeta* pMeta) {
int32_t code = 0; int32_t code = 0;
SMetaCache* pCache = NULL; SMetaCache* pCache = NULL;
...@@ -41,36 +89,45 @@ int32_t metaCacheOpen(SMeta* pMeta) { ...@@ -41,36 +89,45 @@ int32_t metaCacheOpen(SMeta* pMeta) {
goto _err; goto _err;
} }
pCache->nEntry = 0; // open entry cache
pCache->nBucket = META_CACHE_BASE_BUCKET; pCache->sEntryCache.nEntry = 0;
pCache->aBucket = (SMetaCacheEntry**)taosMemoryCalloc(pCache->nBucket, sizeof(SMetaCacheEntry*)); pCache->sEntryCache.nBucket = META_CACHE_BASE_BUCKET;
if (pCache->aBucket == NULL) { pCache->sEntryCache.aBucket =
(SMetaCacheEntry**)taosMemoryCalloc(pCache->sEntryCache.nBucket, sizeof(SMetaCacheEntry*));
if (pCache->sEntryCache.aBucket == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pCache);
goto _err; goto _err;
} }
// open stats cache
pCache->sStbStatsCache.nEntry = 0;
pCache->sStbStatsCache.nBucket = META_CACHE_STATS_BUCKET;
pCache->sStbStatsCache.aBucket =
(SMetaStbStatsEntry**)taosMemoryCalloc(pCache->sStbStatsCache.nBucket, sizeof(SMetaStbStatsEntry*));
if (pCache->sStbStatsCache.aBucket == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err2;
}
pMeta->pCache = pCache; pMeta->pCache = pCache;
_exit: _exit:
return code; return code;
_err2:
entryCacheClose(pMeta);
_err: _err:
taosMemoryFree(pCache);
metaError("vgId:%d meta open cache failed since %s", TD_VID(pMeta->pVnode), tstrerror(code)); metaError("vgId:%d meta open cache failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
return code; return code;
} }
void metaCacheClose(SMeta* pMeta) { void metaCacheClose(SMeta* pMeta) {
if (pMeta->pCache) { if (pMeta->pCache) {
for (int32_t iBucket = 0; iBucket < pMeta->pCache->nBucket; iBucket++) { entryCacheClose(pMeta);
SMetaCacheEntry* pEntry = pMeta->pCache->aBucket[iBucket]; statsCacheClose(pMeta);
while (pEntry) {
SMetaCacheEntry* tEntry = pEntry->next;
taosMemoryFree(pEntry);
pEntry = tEntry;
}
}
taosMemoryFree(pMeta->pCache->aBucket);
taosMemoryFree(pMeta->pCache); taosMemoryFree(pMeta->pCache);
pMeta->pCache = NULL; pMeta->pCache = NULL;
} }
...@@ -81,9 +138,9 @@ static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) { ...@@ -81,9 +138,9 @@ static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) {
int32_t nBucket; int32_t nBucket;
if (expand) { if (expand) {
nBucket = pCache->nBucket * 2; nBucket = pCache->sEntryCache.nBucket * 2;
} else { } else {
nBucket = pCache->nBucket / 2; nBucket = pCache->sEntryCache.nBucket / 2;
} }
SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*)); SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*));
...@@ -93,8 +150,8 @@ static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) { ...@@ -93,8 +150,8 @@ static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) {
} }
// rehash // rehash
for (int32_t iBucket = 0; iBucket < pCache->nBucket; iBucket++) { for (int32_t iBucket = 0; iBucket < pCache->sEntryCache.nBucket; iBucket++) {
SMetaCacheEntry* pEntry = pCache->aBucket[iBucket]; SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
while (pEntry) { while (pEntry) {
SMetaCacheEntry* pTEntry = pEntry->next; SMetaCacheEntry* pTEntry = pEntry->next;
...@@ -107,9 +164,9 @@ static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) { ...@@ -107,9 +164,9 @@ static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) {
} }
// final set // final set
taosMemoryFree(pCache->aBucket); taosMemoryFree(pCache->sEntryCache.aBucket);
pCache->nBucket = nBucket; pCache->sEntryCache.nBucket = nBucket;
pCache->aBucket = aBucket; pCache->sEntryCache.aBucket = aBucket;
_exit: _exit:
return code; return code;
...@@ -122,8 +179,8 @@ int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) { ...@@ -122,8 +179,8 @@ int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
// search // search
SMetaCache* pCache = pMeta->pCache; SMetaCache* pCache = pMeta->pCache;
int32_t iBucket = TABS(pInfo->uid) % pCache->nBucket; int32_t iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
SMetaCacheEntry** ppEntry = &pCache->aBucket[iBucket]; SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) { while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
ppEntry = &(*ppEntry)->next; ppEntry = &(*ppEntry)->next;
} }
...@@ -135,11 +192,11 @@ int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) { ...@@ -135,11 +192,11 @@ int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
(*ppEntry)->info.skmVer = pInfo->skmVer; (*ppEntry)->info.skmVer = pInfo->skmVer;
} }
} else { // insert } else { // insert
if (pCache->nEntry >= pCache->nBucket) { if (pCache->sEntryCache.nEntry >= pCache->sEntryCache.nBucket) {
code = metaRehashCache(pCache, 1); code = metaRehashCache(pCache, 1);
if (code) goto _exit; if (code) goto _exit;
iBucket = TABS(pInfo->uid) % pCache->nBucket; iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
} }
SMetaCacheEntry* pEntryNew = (SMetaCacheEntry*)taosMemoryMalloc(sizeof(*pEntryNew)); SMetaCacheEntry* pEntryNew = (SMetaCacheEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
...@@ -149,9 +206,9 @@ int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) { ...@@ -149,9 +206,9 @@ int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
} }
pEntryNew->info = *pInfo; pEntryNew->info = *pInfo;
pEntryNew->next = pCache->aBucket[iBucket]; pEntryNew->next = pCache->sEntryCache.aBucket[iBucket];
pCache->aBucket[iBucket] = pEntryNew; pCache->sEntryCache.aBucket[iBucket] = pEntryNew;
pCache->nEntry++; pCache->sEntryCache.nEntry++;
} }
_exit: _exit:
...@@ -162,8 +219,8 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) { ...@@ -162,8 +219,8 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
int32_t code = 0; int32_t code = 0;
SMetaCache* pCache = pMeta->pCache; SMetaCache* pCache = pMeta->pCache;
int32_t iBucket = TABS(uid) % pCache->nBucket; int32_t iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
SMetaCacheEntry** ppEntry = &pCache->aBucket[iBucket]; SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
while (*ppEntry && (*ppEntry)->info.uid != uid) { while (*ppEntry && (*ppEntry)->info.uid != uid) {
ppEntry = &(*ppEntry)->next; ppEntry = &(*ppEntry)->next;
} }
...@@ -172,8 +229,9 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) { ...@@ -172,8 +229,9 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
if (pEntry) { if (pEntry) {
*ppEntry = pEntry->next; *ppEntry = pEntry->next;
taosMemoryFree(pEntry); taosMemoryFree(pEntry);
pCache->nEntry--; pCache->sEntryCache.nEntry--;
if (pCache->nEntry < pCache->nBucket / 4 && pCache->nBucket > META_CACHE_BASE_BUCKET) { if (pCache->sEntryCache.nEntry < pCache->sEntryCache.nBucket / 4 &&
pCache->sEntryCache.nBucket > META_CACHE_BASE_BUCKET) {
code = metaRehashCache(pCache, 0); code = metaRehashCache(pCache, 0);
if (code) goto _exit; if (code) goto _exit;
} }
...@@ -189,8 +247,134 @@ int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) { ...@@ -189,8 +247,134 @@ int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) {
int32_t code = 0; int32_t code = 0;
SMetaCache* pCache = pMeta->pCache; SMetaCache* pCache = pMeta->pCache;
int32_t iBucket = TABS(uid) % pCache->nBucket; int32_t iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
SMetaCacheEntry* pEntry = pCache->aBucket[iBucket]; SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
while (pEntry && pEntry->info.uid != uid) {
pEntry = pEntry->next;
}
if (pEntry) {
*pInfo = pEntry->info;
} else {
code = TSDB_CODE_NOT_FOUND;
}
return code;
}
static int32_t metaRehashStatsCache(SMetaCache* pCache, int8_t expand) {
int32_t code = 0;
int32_t nBucket;
if (expand) {
nBucket = pCache->sStbStatsCache.nBucket * 2;
} else {
nBucket = pCache->sStbStatsCache.nBucket / 2;
}
SMetaStbStatsEntry** aBucket = (SMetaStbStatsEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaStbStatsEntry*));
if (aBucket == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
// rehash
for (int32_t iBucket = 0; iBucket < pCache->sStbStatsCache.nBucket; iBucket++) {
SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
while (pEntry) {
SMetaStbStatsEntry* pTEntry = pEntry->next;
pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
pEntry = pTEntry;
}
}
// final set
taosMemoryFree(pCache->sStbStatsCache.aBucket);
pCache->sStbStatsCache.nBucket = nBucket;
pCache->sStbStatsCache.aBucket = aBucket;
_exit:
return code;
}
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo) {
int32_t code = 0;
// ASSERT(metaIsWLocked(pMeta));
// search
SMetaCache* pCache = pMeta->pCache;
int32_t iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
ppEntry = &(*ppEntry)->next;
}
if (*ppEntry) { // update
(*ppEntry)->info.ctbNum = pInfo->ctbNum;
} else { // insert
if (pCache->sStbStatsCache.nEntry >= pCache->sStbStatsCache.nBucket) {
code = metaRehashStatsCache(pCache, 1);
if (code) goto _exit;
iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
}
SMetaStbStatsEntry* pEntryNew = (SMetaStbStatsEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
if (pEntryNew == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pEntryNew->info = *pInfo;
pEntryNew->next = pCache->sStbStatsCache.aBucket[iBucket];
pCache->sStbStatsCache.aBucket[iBucket] = pEntryNew;
pCache->sStbStatsCache.nEntry++;
}
_exit:
return code;
}
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid) {
int32_t code = 0;
SMetaCache* pCache = pMeta->pCache;
int32_t iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
while (*ppEntry && (*ppEntry)->info.uid != uid) {
ppEntry = &(*ppEntry)->next;
}
SMetaStbStatsEntry* pEntry = *ppEntry;
if (pEntry) {
*ppEntry = pEntry->next;
taosMemoryFree(pEntry);
pCache->sStbStatsCache.nEntry--;
if (pCache->sStbStatsCache.nEntry < pCache->sStbStatsCache.nBucket / 4 &&
pCache->sStbStatsCache.nBucket > META_CACHE_STATS_BUCKET) {
code = metaRehashStatsCache(pCache, 0);
if (code) goto _exit;
}
} else {
code = TSDB_CODE_NOT_FOUND;
}
_exit:
return code;
}
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
int32_t code = TSDB_CODE_SUCCESS;
SMetaCache* pCache = pMeta->pCache;
int32_t iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
while (pEntry && pEntry->info.uid != uid) { while (pEntry && pEntry->info.uid != uid) {
pEntry = pEntry->next; pEntry = pEntry->next;
......
...@@ -1260,3 +1260,32 @@ _exit: ...@@ -1260,3 +1260,32 @@ _exit:
tdbFree(pData); tdbFree(pData);
return code; return code;
} }
int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo) {
int32_t code = 0;
metaRLock(pMeta);
// fast path: search cache
if (metaStatsCacheGet(pMeta, uid, pInfo) == TSDB_CODE_SUCCESS) {
metaULock(pMeta);
goto _exit;
}
// slow path: search TDB
int64_t ctbNum = 0;
vnodeGetCtbNum(pMeta->pVnode, uid, &ctbNum);
metaULock(pMeta);
pInfo->uid = uid;
pInfo->ctbNum = ctbNum;
// upsert the cache
metaWLock(pMeta);
metaStatsCacheUpsert(pMeta, pInfo);
metaULock(pMeta);
_exit:
return code;
}
...@@ -362,6 +362,8 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { ...@@ -362,6 +362,8 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
// update uid index // update uid index
metaUpdateUidIdx(pMeta, &nStbEntry); metaUpdateUidIdx(pMeta, &nStbEntry);
metaStatsCacheDrop(pMeta, nStbEntry.uid);
metaULock(pMeta); metaULock(pMeta);
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf); if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
...@@ -615,6 +617,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { ...@@ -615,6 +617,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), &pMeta->txn); tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), &pMeta->txn);
// drop schema.db (todo) // drop schema.db (todo)
metaStatsCacheDrop(pMeta, uid);
--pMeta->pVnode->config.vndStats.numOfSTables; --pMeta->pVnode->config.vndStats.numOfSTables;
} }
......
...@@ -504,27 +504,36 @@ static int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) { ...@@ -504,27 +504,36 @@ static int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
} }
int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num) { int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num) {
SMStbCursor *pCur = metaOpenStbCursor(pVnode->pMeta, 0); SArray *suidList = NULL;
if (!pCur) {
if (!(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
if (vnodeGetStbIdList(pVnode, 0, suidList) < 0) {
qError("vgId:%d, failed to get stb id list error: %s", TD_VID(pVnode), terrstr());
taosArrayDestroy(suidList);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
*num = 0; *num = 0;
while (1) { int64_t arrSize = taosArrayGetSize(suidList);
tb_uid_t id = metaStbCursorNext(pCur); for (int64_t i = 0; i < arrSize; ++i) {
if (id == 0) { tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
break;
} SMetaStbStats stats = {0};
metaGetStbStats(pVnode->pMeta, suid, &stats);
int64_t ctbNum = stats.ctbNum;
// vnodeGetCtbNum(pVnode, id, &ctbNum);
int64_t ctbNum = 0;
vnodeGetCtbNum(pVnode, id, &ctbNum);
int numOfCols = 0; int numOfCols = 0;
vnodeGetStbColumnNum(pVnode, id, &numOfCols); vnodeGetStbColumnNum(pVnode, suid, &numOfCols);
*num += ctbNum * (numOfCols - 1); *num += ctbNum * (numOfCols - 1);
} }
metaCloseStbCursor(pCur); taosArrayDestroy(suidList);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册