From 48495592c1b988e0ee46782a3193b4b8c26f63fd Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 12 Feb 2022 20:03:42 +0800 Subject: [PATCH] feature/qnode --- source/libs/catalog/inc/catalogInt.h | 20 +- source/libs/catalog/src/catalog.c | 425 +++++++++++++++------- source/libs/catalog/test/catalogTests.cpp | 306 ++++++++++++---- 3 files changed, 553 insertions(+), 198 deletions(-) diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 6bfc8d6ca3..f427910758 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -105,7 +105,8 @@ typedef struct SCtgApiStat { } SCtgApiStat; typedef struct SCtgRuntimeStat { - + uint64_t qNum; + uint64_t qDoneNum; } SCtgRuntimeStat; typedef struct SCtgCacheStat { @@ -161,6 +162,7 @@ typedef struct SCatalogMgmt { SCtgQNode *head; SCtgQNode *tail; tsem_t sem; + uint64_t qRemainNum; pthread_t updateThread; SHashObj *pCluster; //key: clusterId, value: SCatalog* SCatalogStat stat; @@ -170,6 +172,18 @@ typedef struct SCatalogMgmt { typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); typedef int32_t (*ctgActFunc)(SCtgMetaAction *); +typedef struct SCtgAction { + int32_t actId; + char name[32]; + ctgActFunc func; +} SCtgAction; + +#define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.qRemainNum, 1) +#define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.qRemainNum, 1) + +#define CTG_STAT_ADD(n) qError("done:%" PRId64, atomic_add_fetch_64(&(n), 1)) +#define CTG_STAT_SUB(n) atomic_sub_fetch_64(&(n), 1) + #define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE) #define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE) #define CTG_IS_META_TABLE(type) ((type) == META_TYPE_TABLE) @@ -236,8 +250,8 @@ typedef int32_t (*ctgActFunc)(SCtgMetaAction *); #define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) -#define CTG_API_ENTER() do { CTG_API_DEBUG("enter %s", __FUNCTION__); CTG_LOCK(CTG_READ, &ctgMgmt.lock); if (atomic_load_8(&ctgMgmt.exit)) { CTG_UNLOCK(CTG_READ, &ctgMgmt.lock); CTG_RET(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0) -#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &ctgMgmt.lock); CTG_API_DEBUG("leave %s", __FUNCTION__); CTG_RET(__code); } while (0) +#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); CTG_RET(__code); } while (0) +#define CTG_API_ENTER() do { CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); CTG_LOCK(CTG_READ, &gCtgMgmt.lock); if (atomic_load_8(&gCtgMgmt.exit)) { CTG_API_LEAVE(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 5195a0adf8..732d3ab111 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -24,11 +24,69 @@ int32_t ctgActRemoveDB(SCtgMetaAction *action); int32_t ctgActRemoveStb(SCtgMetaAction *action); int32_t ctgActRemoveTbl(SCtgMetaAction *action); -SCatalogMgmt ctgMgmt = {0}; - +SCatalogMgmt gCtgMgmt = {0}; SCtgDebug gCTGDebug = {0}; +SCtgAction gCtgAction[CTG_ACT_MAX] = {{ + CTG_ACT_UPDATE_VG, + "update vgInfo", + ctgActUpdateVg + }, + { + CTG_ACT_UPDATE_TBL, + "update tbMeta", + ctgActUpdateTbl + }, + { + CTG_ACT_REMOVE_DB, + "remove DB", + ctgActRemoveDB + }, + { + CTG_ACT_REMOVE_STB, + "remove stbMeta", + ctgActRemoveStb + }, + { + CTG_ACT_REMOVE_TBL, + "remove tbMeta", + ctgActRemoveTbl + } +}; + +int32_t ctgDbgEnableDebug(char *option) { + if (0 == strcasecmp(option, "lock")) { + gCTGDebug.lockDebug = true; + qDebug("lock debug enabled"); + return TSDB_CODE_SUCCESS; + } + + if (0 == strcasecmp(option, "cache")) { + gCTGDebug.cacheDebug = true; + qDebug("cache debug enabled"); + return TSDB_CODE_SUCCESS; + } + + if (0 == strcasecmp(option, "api")) { + gCTGDebug.apiDebug = true; + qDebug("api debug enabled"); + return TSDB_CODE_SUCCESS; + } + + qError("invalid debug option:%s", option); + + return TSDB_CODE_CTG_INTERNAL_ERROR; +} + +int32_t ctgDbgGetStatNum(char *option, void *res) { + if (0 == strcasecmp(option, "runtime.qDoneNum")) { + *(uint64_t *)res = atomic_load_64(&gCtgMgmt.stat.runtime.qDoneNum); + return TSDB_CODE_SUCCESS; + } -ctgActFunc ctgActFuncs[] = {ctgActUpdateVg, ctgActUpdateTbl, ctgActRemoveDB, ctgActRemoveStb, ctgActRemoveTbl}; + qError("invalid stat option:%s", option); + + return TSDB_CODE_CTG_INTERNAL_ERROR; +} int32_t ctgDbgGetTbMetaNum(SCtgDBCache *dbCache) { return dbCache->tbCache.metaCache ? (int32_t)taosHashGetSize(dbCache->tbCache.metaCache) : 0; @@ -128,6 +186,45 @@ void ctgDbgShowClusterCache(SCatalog* pCtg) { ctgDbgShowDBCache(pCtg->dbCache); } + + +void ctgPopAction(SCtgMetaAction **action) { + SCtgQNode *orig = gCtgMgmt.head; + + SCtgQNode *node = gCtgMgmt.head->next; + gCtgMgmt.head = gCtgMgmt.head->next; + + CTG_QUEUE_SUB(); + + tfree(orig); + + *action = &node->action; +} + + +int32_t ctgPushAction(SCtgMetaAction *action) { + SCtgQNode *node = calloc(1, sizeof(SCtgQNode)); + if (NULL == node) { + qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); + CTG_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + node->action = *action; + + CTG_LOCK(CTG_WRITE, &gCtgMgmt.qlock); + gCtgMgmt.tail->next = node; + gCtgMgmt.tail = node; + CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.qlock); + + CTG_QUEUE_ADD(); + //CTG_STAT_ADD(gCtgMgmt.stat.runtime.qNum); + + tsem_post(&gCtgMgmt.sem); + + return TSDB_CODE_SUCCESS; +} + + void ctgFreeMetaRent(SCtgRentMgmt *mgmt) { if (NULL == mgmt->slots) { return; @@ -210,16 +307,29 @@ void ctgFreeHandle(SCatalog* pCtg) { } -int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) { +int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) { CTG_LOCK(CTG_READ, &dbCache->vgLock); + if (dbCache->deleted) { + CTG_UNLOCK(CTG_READ, &dbCache->vgLock); + + ctgDebug("db is dropping, dbId:%"PRIx64, dbCache->dbId); + + *inCache = false; + return TSDB_CODE_SUCCESS; + } + + if (NULL == dbCache->vgInfo) { CTG_UNLOCK(CTG_READ, &dbCache->vgLock); + *inCache = false; ctgDebug("db vgInfo is empty, dbId:%"PRIx64, dbCache->dbId); return TSDB_CODE_SUCCESS; } + *inCache = true; + return TSDB_CODE_SUCCESS; } @@ -302,12 +412,11 @@ int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCac return TSDB_CODE_SUCCESS; } - ctgAcquireVgInfo(pCtg, dbCache); - if (NULL == dbCache->vgInfo) { + ctgAcquireVgInfo(pCtg, dbCache, inCache); + if (!(*inCache)) { ctgReleaseDBCache(pCtg, dbCache); *pCache = NULL; - *inCache = false; return TSDB_CODE_SUCCESS; } @@ -325,7 +434,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtE char *msg = NULL; int32_t msgLen = 0; - ctgDebug("try to get db vgroup from mnode, db:%s", input->db); + ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db); int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen); if (code) { @@ -358,6 +467,8 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtE CTG_ERR_RET(code); } + ctgDebug("Got db vgInfo from mnode, dbFName:%s", input->db); + return TSDB_CODE_SUCCESS; } @@ -381,7 +492,7 @@ int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); if (NULL == tbMeta) { - taosHashRelease(pCtg->dbCache, dbCache); + ctgReleaseDBCache(pCtg, dbCache); *exist = 0; ctgDebug("tbmeta not in cache, dbFName:%s, tbName:%s", dbFName, tbName); @@ -390,7 +501,7 @@ int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, *exist = 1; - taosHashRelease(pCtg->dbCache, dbCache); + ctgReleaseDBCache(pCtg, dbCache); ctgDebug("tbmeta is in cache, dbFName:%s, tbName:%s", dbFName, tbName); @@ -414,7 +525,6 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable ctgAcquireDBCache(pCtg, dbFName, &dbCache); if (NULL == dbCache) { *exist = 0; - ctgWarn("no db cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname); return TSDB_CODE_SUCCESS; } @@ -486,12 +596,12 @@ int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_ return TSDB_CODE_SUCCESS; } - char dbName[TSDB_DB_FNAME_LEN] = {0}; - tNameGetFullDbName(pTableName, dbName); + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(pTableName, dbFName); - SCtgDBCache *dbCache = taosHashAcquire(pCtg->dbCache, dbName, strlen(dbName)); + SCtgDBCache *dbCache = NULL; + ctgAcquireDBCache(pCtg, dbFName, &dbCache); if (NULL == dbCache) { - ctgInfo("db not in cache, dbFName:%s", dbName); return TSDB_CODE_SUCCESS; } @@ -500,8 +610,8 @@ int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_ if (NULL == pTableMeta) { CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); - ctgWarn("tbl not in cache, dbFName:%s, tbName:%s", dbName, pTableName->tname); - taosHashRelease(pCtg->dbCache, dbCache); + ctgWarn("tbl not in cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname); + ctgReleaseDBCache(pCtg, dbCache); return TSDB_CODE_SUCCESS; } @@ -512,9 +622,9 @@ int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_ CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); - taosHashRelease(pCtg->dbCache, dbCache); + ctgReleaseDBCache(pCtg, dbCache); - ctgDebug("Got tbtype from cache, dbFName:%s, tbName:%s, type:%d", dbName, pTableName->tname, *tbType); + ctgDebug("Got tbtype from cache, dbFName:%s, tbName:%s, type:%d", dbFName, pTableName->tname, *tbType); return TSDB_CODE_SUCCESS; } @@ -940,15 +1050,15 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { SCtgDBCache newDBCache = {0}; newDBCache.dbId = dbId; - newDBCache.tbCache.metaCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + newDBCache.tbCache.metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == newDBCache.tbCache.metaCache) { - ctgError("taosHashInit %d metaCache failed", ctgMgmt.cfg.maxTblCacheNum); + ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } - newDBCache.tbCache.stbCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); + newDBCache.tbCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); if (NULL == newDBCache.tbCache.stbCache) { - ctgError("taosHashInit %d stbCache failed", ctgMgmt.cfg.maxTblCacheNum); + ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } @@ -1002,6 +1112,10 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) { int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) { + uint64_t dbId = dbCache->dbId; + + ctgInfo("start to remove db from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId); + atomic_store_8(&dbCache->deleted, 1); ctgRemoveStbRent(pCtg, &dbCache->tbCache); @@ -1018,6 +1132,8 @@ int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) { ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName); CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); } + + ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId); return TSDB_CODE_SUCCESS; } @@ -1160,6 +1276,8 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } + ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType); + if (!isStb) { CTG_UNLOCK(CTG_READ, &tbCache->metaLock); return TSDB_CODE_SUCCESS; @@ -1183,7 +1301,7 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui CTG_UNLOCK(CTG_READ, &tbCache->metaLock); - ctgDebug("meta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType); + ctgDebug("stb updated to stbCache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType); SSTableMetaVersion metaRent = {.dbId = dbId, .suid = meta->suid, .sversion = meta->sversion, .tversion = meta->tversion}; strcpy(metaRent.dbFName, dbFName); @@ -1193,6 +1311,45 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui return TSDB_CODE_SUCCESS; } +int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) { + *dst = malloc(sizeof(SDBVgInfo)); + if (NULL == *dst) { + qError("malloc %d failed", (int32_t)sizeof(SDBVgInfo)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + memcpy(*dst, src, sizeof(SDBVgInfo)); + + size_t hashSize = taosHashGetSize(src->vgHash); + (*dst)->vgHash = taosHashInit(hashSize, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (NULL == (*dst)->vgHash) { + qError("taosHashInit %d failed", (int32_t)hashSize); + tfree(*dst); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + int32_t *vgId = NULL; + void *pIter = taosHashIterate(src->vgHash, NULL); + while (pIter) { + taosHashGetKey(pIter, (void **)&vgId, NULL); + + if (taosHashPut((*dst)->vgHash, (void *)vgId, sizeof(int32_t), pIter, sizeof(SVgroupInfo))) { + qError("taosHashPut failed, hashSize:%d", (int32_t)hashSize); + taosHashCancelIterate(src->vgHash, pIter); + taosHashCleanup((*dst)->vgHash); + tfree(*dst); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + pIter = taosHashIterate(src->vgHash, pIter); + } + + + return TSDB_CODE_SUCCESS; +} + + + int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache, SDBVgInfo **pInfo) { bool inCache = false; int32_t code = 0; @@ -1209,23 +1366,39 @@ int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const tstrncpy(input.db, dbFName, tListLen(input.db)); input.vgVersion = CTG_DEFAULT_INVALID_VERSION; - while (true) { - CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut)); - - code = ctgUpdateDBVgInfo(pCtg, dbFName, DbOut.dbId, &DbOut.dbVgroup); - if (code && DbOut.dbVgroup) { - *pInfo = DbOut.dbVgroup; - return TSDB_CODE_SUCCESS; - } - - CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache)); + CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut)); - if (inCache) { - return TSDB_CODE_SUCCESS; - } + CTG_ERR_JRET(ctgCloneVgInfo(DbOut.dbVgroup, pInfo)); + + SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG}; + SCtgUpdateVgMsg *msg = malloc(sizeof(SCtgUpdateVgMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); + ctgFreeVgInfo(DbOut.dbVgroup); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + msg->pCtg = pCtg; + msg->dbId = DbOut.dbId; + msg->dbInfo = DbOut.dbVgroup; + + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(&action)); + + ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); + return TSDB_CODE_SUCCESS; + +_return: + + tfree(*pInfo); + tfree(msg); + + *pInfo = DbOut.dbVgroup; + + CTG_RET(code); } @@ -1253,36 +1426,6 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) return TSDB_CODE_SUCCESS; } -void ctgPopAction(SCtgMetaAction **action) { - SCtgQNode *orig = ctgMgmt.head; - - SCtgQNode *node = ctgMgmt.head->next; - ctgMgmt.head = ctgMgmt.head->next; - - tfree(orig); - - *action = &node->action; -} - - -int32_t ctgPushAction(SCtgMetaAction *action) { - SCtgQNode *node = calloc(1, sizeof(SCtgQNode)); - if (NULL == node) { - qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); - CTG_RET(TSDB_CODE_CTG_MEM_ERROR); - } - - node->action = *action; - - CTG_LOCK(CTG_WRITE, &ctgMgmt.qlock); - ctgMgmt.tail->next = node; - ctgMgmt.tail = node; - CTG_UNLOCK(CTG_WRITE, &ctgMgmt.qlock); - - tsem_post(&ctgMgmt.sem); - - return TSDB_CODE_SUCCESS; -} int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable, STableMetaOutput **pOutput) { @@ -1295,6 +1438,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgm CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo)); + SCtgUpdateTblMsg *msg = NULL; STableMetaOutput moutput = {0}; STableMetaOutput *output = malloc(sizeof(STableMetaOutput)); if (NULL == output) { @@ -1303,7 +1447,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgm } if (CTG_IS_STABLE(isSTable)) { - ctgDebug("will renew tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pTableName)); + ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pTableName)); // if get from mnode failed, will not try vnode CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCtg, pTransporter, pMgmtEps, pTableName, output)); @@ -1312,13 +1456,13 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgm CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo, output)); } } else { - ctgDebug("will renew tbmeta, not supposed to be stb, tbName:%s, isStable:%d", tNameGetTableName(pTableName), isSTable); + ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, isStable:%d", tNameGetTableName(pTableName), isSTable); // if get from vnode failed or no table meta, will not try mnode CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo, output)); if (CTG_IS_META_TABLE(output->metaType) && TSDB_SUPER_TABLE == output->tbMeta->tableType) { - ctgDebug("will continue to renew tbmeta since got stb, tbName:%s, metaType:%d", tNameGetTableName(pTableName), output->metaType); + ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s, metaType:%d", tNameGetTableName(pTableName), output->metaType); tfree(output->tbMeta); @@ -1345,16 +1489,22 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgm } if (CTG_IS_META_NULL(output->metaType)) { - ctgError("no tablemeta got, tbNmae:%s", tNameGetTableName(pTableName)); + ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pTableName)); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); } + if (CTG_IS_META_TABLE(output->metaType)) { + ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d", output->dbFName, output->tbName, output->tbMeta->tableType); + } else { + ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d, stbMetaGot:%d", output->dbFName, output->ctbName, output->ctbMeta.tableType, CTG_IS_META_BOTH(output->metaType)); + } + if (pOutput) { CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput)); } SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL}; - SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg)); + msg = malloc(sizeof(SCtgUpdateTblMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg)); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); @@ -1367,11 +1517,14 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgm CTG_ERR_JRET(ctgPushAction(&action)); + ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); + return TSDB_CODE_SUCCESS; _return: tfree(output->tbMeta); + tfree(output); tfree(msg); CTG_RET(code); @@ -1440,6 +1593,10 @@ _return: tfree(output); + if (*pTableMeta) { + ctgDebug("tbmeta returned, tbName:%s, tbType:%d", pTableName->tname, (*pTableMeta)->tableType); + } + CTG_RET(code); } @@ -1465,15 +1622,13 @@ int32_t ctgActRemoveDB(SCtgMetaAction *action) { SCatalog* pCtg = msg->pCtg; SCtgDBCache *dbCache = NULL; - ctgAcquireDBCache(msg->pCtg, msg->dbFName, &dbCache); + ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache); if (NULL == dbCache) { - ctgInfo("db not exist in cache, may be removed, dbFName:%s", msg->dbFName); goto _return; } if (dbCache->dbId != msg->dbId) { ctgInfo("dbId already updated, dbFName:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId); - ctgReleaseDBCache(msg->pCtg, dbCache); goto _return; } @@ -1496,7 +1651,7 @@ int32_t ctgActUpdateTbl(SCtgMetaAction *action) { if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) { ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", output->dbFName, output->tbName); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (CTG_IS_META_BOTH(output->metaType) && TSDB_SUPER_TABLE != output->tbMeta->tableType) { @@ -1522,10 +1677,11 @@ int32_t ctgActUpdateTbl(SCtgMetaAction *action) { _return: - if (dbCache) { - taosHashRelease(pCtg->dbCache, dbCache); + if (output) { + tfree(output->tbMeta); + tfree(output); } - + tfree(msg); CTG_RET(code); @@ -1591,22 +1747,26 @@ void* ctgUpdateThreadFunc(void* param) { qInfo("catalog update thread started"); - CTG_LOCK(CTG_READ, &ctgMgmt.lock); + CTG_LOCK(CTG_READ, &gCtgMgmt.lock); while (true) { - tsem_wait(&ctgMgmt.sem); + tsem_wait(&gCtgMgmt.sem); - if (atomic_load_8(&ctgMgmt.exit)) { + if (atomic_load_8(&gCtgMgmt.exit)) { break; } SCtgMetaAction *action = NULL; ctgPopAction(&action); - (*ctgActFuncs[action->act])(action); + qDebug("process %s action", gCtgAction[action->act].name); + + (*gCtgAction[action->act].func)(action); + + CTG_STAT_ADD(gCtgMgmt.stat.runtime.qDoneNum); } - CTG_UNLOCK(CTG_READ, &ctgMgmt.lock); + CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); qInfo("catalog update thread stopped"); @@ -1619,7 +1779,7 @@ int32_t ctgStartUpdateThread() { pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&ctgMgmt.updateThread, &thAttr, ctgUpdateThreadFunc, NULL) != 0) { + if (pthread_create(&gCtgMgmt.updateThread, &thAttr, ctgUpdateThreadFunc, NULL) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); CTG_ERR_RET(terrno); } @@ -1630,56 +1790,56 @@ int32_t ctgStartUpdateThread() { int32_t catalogInit(SCatalogCfg *cfg) { - if (ctgMgmt.pCluster) { + if (gCtgMgmt.pCluster) { qError("catalog already initialized"); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - atomic_store_8(&ctgMgmt.exit, false); + atomic_store_8(&gCtgMgmt.exit, false); if (cfg) { - memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg)); + memcpy(&gCtgMgmt.cfg, cfg, sizeof(*cfg)); - if (ctgMgmt.cfg.maxDBCacheNum == 0) { - ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER; + if (gCtgMgmt.cfg.maxDBCacheNum == 0) { + gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER; } - if (ctgMgmt.cfg.maxTblCacheNum == 0) { - ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER; + if (gCtgMgmt.cfg.maxTblCacheNum == 0) { + gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER; } - if (ctgMgmt.cfg.dbRentSec == 0) { - ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND; + if (gCtgMgmt.cfg.dbRentSec == 0) { + gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND; } - if (ctgMgmt.cfg.stbRentSec == 0) { - ctgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND; + if (gCtgMgmt.cfg.stbRentSec == 0) { + gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND; } } else { - ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER; - ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER; - ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND; - ctgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND; + gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER; + gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER; + gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND; + gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND; } - ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - if (NULL == ctgMgmt.pCluster) { + gCtgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (NULL == gCtgMgmt.pCluster) { qError("taosHashInit %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } CTG_ERR_RET(ctgStartUpdateThread()); - tsem_init(&ctgMgmt.sem, 0, 0); + tsem_init(&gCtgMgmt.sem, 0, 0); - ctgMgmt.head = calloc(1, sizeof(SCtgQNode)); - if (NULL == ctgMgmt.head) { + gCtgMgmt.head = calloc(1, sizeof(SCtgQNode)); + if (NULL == gCtgMgmt.head) { qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); CTG_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - ctgMgmt.tail = ctgMgmt.head; + gCtgMgmt.tail = gCtgMgmt.head; - qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", ctgMgmt.cfg.maxDBCacheNum, ctgMgmt.cfg.maxTblCacheNum, ctgMgmt.cfg.dbRentSec, ctgMgmt.cfg.stbRentSec); + qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec); return TSDB_CODE_SUCCESS; } @@ -1689,7 +1849,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - if (NULL == ctgMgmt.pCluster) { + if (NULL == gCtgMgmt.pCluster) { qError("catalog cluster cache are not ready, clusterId:%"PRIx64, clusterId); CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY); } @@ -1698,7 +1858,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { SCatalog *clusterCtg = NULL; while (true) { - SCatalog **ctg = (SCatalog **)taosHashGet(ctgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId)); + SCatalog **ctg = (SCatalog **)taosHashGet(gCtgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId)); if (ctg && (*ctg)) { *catalogHandle = *ctg; @@ -1714,22 +1874,22 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { clusterCtg->clusterId = clusterId; - CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, ctgMgmt.cfg.dbRentSec, CTG_RENT_DB)); - CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, ctgMgmt.cfg.stbRentSec, CTG_RENT_STABLE)); + CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB)); + CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE)); - clusterCtg->dbCache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (NULL == clusterCtg->dbCache) { qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - SHashObj *metaCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + SHashObj *metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == metaCache) { - qError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum); + qError("taosHashInit failed, num:%d", gCtgMgmt.cfg.maxTblCacheNum); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } - code = taosHashPut(ctgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES); + code = taosHashPut(gCtgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES); if (code) { if (HASH_NODE_EXIST(code)) { ctgFreeHandle(clusterCtg); @@ -1761,7 +1921,7 @@ void catalogFreeHandle(SCatalog* pCtg) { return; } - if (taosHashRemove(ctgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) { + if (taosHashRemove(gCtgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) { ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:%"PRIx64, pCtg->clusterId); return; } @@ -1793,8 +1953,9 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers CTG_API_LEAVE(TSDB_CODE_SUCCESS); } - ctgAcquireVgInfo(pCtg, dbCache); - if (NULL == dbCache->vgInfo) { + bool inCache = false; + ctgAcquireVgInfo(pCtg, dbCache, &inCache); + if (!inCache) { ctgReleaseDBCache(pCtg, dbCache); *version = CTG_DEFAULT_INVALID_VERSION; @@ -1877,6 +2038,8 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId CTG_ERR_JRET(ctgPushAction(&action)); + ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); + CTG_API_LEAVE(code); _return: @@ -1920,6 +2083,8 @@ int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) { CTG_ERR_JRET(ctgPushAction(&action)); + ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); + CTG_API_LEAVE(TSDB_CODE_SUCCESS); _return: @@ -1959,6 +2124,8 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, CTG_ERR_JRET(ctgPushAction(&action)); + ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); + CTG_API_LEAVE(TSDB_CODE_SUCCESS); _return: @@ -2019,6 +2186,8 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) { CTG_ERR_JRET(ctgPushAction(&action)); + ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); + CTG_API_LEAVE(code); _return: @@ -2059,6 +2228,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm SVgroupInfo vgroupInfo = {0}; SCtgDBCache* dbCache = NULL; SArray *vgList = NULL; + SDBVgInfo *vgInfo = NULL; *pVgList = NULL; @@ -2068,7 +2238,6 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm tNameGetFullDbName(pTableName, db); SHashObj *vgHash = NULL; - SDBVgInfo *vgInfo = NULL; CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, false, &dbCache, &vgInfo)); if (dbCache) { @@ -2254,16 +2423,18 @@ int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion **dbs, uint32_t *num) void catalogDestroy(void) { qInfo("start to destroy catalog"); - if (NULL == ctgMgmt.pCluster || atomic_load_8(&ctgMgmt.exit)) { + if (NULL == gCtgMgmt.pCluster || atomic_load_8(&gCtgMgmt.exit)) { return; } - atomic_store_8(&ctgMgmt.exit, true); + atomic_store_8(&gCtgMgmt.exit, true); + + tsem_post(&gCtgMgmt.sem); - CTG_LOCK(CTG_WRITE, &ctgMgmt.lock); + CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock); SCatalog *pCtg = NULL; - void *pIter = taosHashIterate(ctgMgmt.pCluster, NULL); + void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); while (pIter) { pCtg = *(SCatalog **)pIter; @@ -2271,13 +2442,13 @@ void catalogDestroy(void) { catalogFreeHandle(pCtg); } - pIter = taosHashIterate(ctgMgmt.pCluster, pIter); + pIter = taosHashIterate(gCtgMgmt.pCluster, pIter); } - taosHashCleanup(ctgMgmt.pCluster); - ctgMgmt.pCluster = NULL; + taosHashCleanup(gCtgMgmt.pCluster); + gCtgMgmt.pCluster = NULL; - CTG_UNLOCK(CTG_WRITE, &ctgMgmt.lock); + CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock); qInfo("catalog destroyed"); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index fc97040ffb..3db85b11d0 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -41,11 +41,23 @@ extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog *pCatalog, const SNa int32_t *exist); extern "C" int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type); extern "C" int32_t ctgActUpdateTbl(SCtgMetaAction *action); +extern "C" int32_t ctgDbgEnableDebug(char *option); +extern "C" int32_t ctgDbgGetStatNum(char *option, void *res); -void ctgTestSetPrepareTableMeta(); -void ctgTestSetPrepareCTableMeta(); -void ctgTestSetPrepareSTableMeta(); -void ctgTestSetPrepareMultiSTableMeta(); +void ctgTestSetRspTableMeta(); +void ctgTestSetRspCTableMeta(); +void ctgTestSetRspSTableMeta(); +void ctgTestSetRspMultiSTableMeta(); + +extern "C" SCatalogMgmt gCtgMgmt; + +enum { + CTGT_RSP_VGINFO = 1, + CTGT_RSP_TBMETA, + CTGT_RSP_CTBMETA, + CTGT_RSP_STBMETA, + CTGT_RSP_MSTBMETA, +}; bool ctgTestStop = false; bool ctgTestEnableSleep = false; @@ -69,6 +81,9 @@ char *ctgTestTablename = "table1"; char *ctgTestCTablename = "ctable1"; char *ctgTestSTablename = "stable1"; +int32_t ctgTestRspFunc[10] = {0}; +int32_t ctgTestRspIdx = 0; + void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { SCreateDbReq *pReq = (SCreateDbReq *)rpcMallocCont(sizeof(SCreateDbReq)); strcpy(pReq->db, "1.db1"); @@ -111,6 +126,8 @@ void ctgTestInitLogFile() { tsAsyncLog = 0; qDebugFlag = 159; + ctgDbgEnableDebug("api"); + char temp[128] = {0}; sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix); if (taosInitLog(temp, tsNumOfLogLines, maxLogFileNum) < 0) { @@ -250,7 +267,7 @@ void ctgTestBuildSTableMetaRsp(STableMetaRsp *rspMsg) { } -void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { +void ctgTestRspDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { SUseDbRsp *rspMsg = NULL; // todo pRsp->code = 0; @@ -286,7 +303,7 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM return; } -void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { +void ctgTestRspTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { STableMetaRsp *rspMsg = NULL; // todo pRsp->code = 0; @@ -322,7 +339,7 @@ void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM return; } -void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { +void ctgTestRspCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { STableMetaRsp *rspMsg = NULL; // todo pRsp->code = 0; @@ -365,7 +382,7 @@ void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc return; } -void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { +void ctgTestRspSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { STableMetaRsp *rspMsg = NULL; // todo pRsp->code = 0; @@ -408,7 +425,7 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc return; } -void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { +void ctgTestRspMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { STableMetaRsp *rspMsg = NULL; // todo static int32_t idx = 1; @@ -454,151 +471,193 @@ void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, return; } -void ctgTestPrepareDbVgroupsAndNormalMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { - ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp); - ctgTestSetPrepareTableMeta(); +void ctgTestRspByIdx(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + switch (ctgTestRspFunc[ctgTestRspIdx]) { + case CTGT_RSP_VGINFO: + ctgTestRspDbVgroups(shandle, pEpSet, pMsg, pRsp); + break; + case CTGT_RSP_TBMETA: + ctgTestRspTableMeta(shandle, pEpSet, pMsg, pRsp); + break; + case CTGT_RSP_CTBMETA: + ctgTestRspCTableMeta(shandle, pEpSet, pMsg, pRsp); + break; + case CTGT_RSP_STBMETA: + ctgTestRspSTableMeta(shandle, pEpSet, pMsg, pRsp); + break; + case CTGT_RSP_MSTBMETA: + ctgTestRspMultiSTableMeta(shandle, pEpSet, pMsg, pRsp); + break; + default: + break; + } + + ctgTestRspIdx++; + + return; +} + + +void ctgTestRspDbVgroupsAndNormalMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + ctgTestRspDbVgroups(shandle, pEpSet, pMsg, pRsp); + + ctgTestSetRspTableMeta(); return; } -void ctgTestPrepareDbVgroupsAndChildMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { - ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp); +void ctgTestRspDbVgroupsAndChildMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + ctgTestRspDbVgroups(shandle, pEpSet, pMsg, pRsp); - ctgTestSetPrepareCTableMeta(); + ctgTestSetRspCTableMeta(); return; } -void ctgTestPrepareDbVgroupsAndSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { - ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp); +void ctgTestRspDbVgroupsAndSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + ctgTestRspDbVgroups(shandle, pEpSet, pMsg, pRsp); - ctgTestSetPrepareSTableMeta(); + ctgTestSetRspSTableMeta(); return; } -void ctgTestPrepareDbVgroupsAndMultiSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { - ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp); +void ctgTestRspDbVgroupsAndMultiSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + ctgTestRspDbVgroups(shandle, pEpSet, pMsg, pRsp); - ctgTestSetPrepareMultiSTableMeta(); + ctgTestSetRspMultiSTableMeta(); return; } -void ctgTestSetPrepareDbVgroups() { +void ctgTestSetRspDbVgroups() { + static Stub stub; + stub.set(rpcSendRecv, ctgTestRspDbVgroups); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto &f : result) { + stub.set(f.second, ctgTestRspDbVgroups); + } + } +} + +void ctgTestSetRspTableMeta() { static Stub stub; - stub.set(rpcSendRecv, ctgTestPrepareDbVgroups); + stub.set(rpcSendRecv, ctgTestRspTableMeta); { AddrAny any("libtransport.so"); std::map result; any.get_global_func_addr_dynsym("^rpcSendRecv$", result); for (const auto &f : result) { - stub.set(f.second, ctgTestPrepareDbVgroups); + stub.set(f.second, ctgTestRspTableMeta); } } } -void ctgTestSetPrepareTableMeta() { +void ctgTestSetRspCTableMeta() { static Stub stub; - stub.set(rpcSendRecv, ctgTestPrepareTableMeta); + stub.set(rpcSendRecv, ctgTestRspCTableMeta); { AddrAny any("libtransport.so"); std::map result; any.get_global_func_addr_dynsym("^rpcSendRecv$", result); for (const auto &f : result) { - stub.set(f.second, ctgTestPrepareTableMeta); + stub.set(f.second, ctgTestRspCTableMeta); } } } -void ctgTestSetPrepareCTableMeta() { +void ctgTestSetRspSTableMeta() { static Stub stub; - stub.set(rpcSendRecv, ctgTestPrepareCTableMeta); + stub.set(rpcSendRecv, ctgTestRspSTableMeta); { AddrAny any("libtransport.so"); std::map result; any.get_global_func_addr_dynsym("^rpcSendRecv$", result); for (const auto &f : result) { - stub.set(f.second, ctgTestPrepareCTableMeta); + stub.set(f.second, ctgTestRspSTableMeta); } } } -void ctgTestSetPrepareSTableMeta() { +void ctgTestSetRspMultiSTableMeta() { static Stub stub; - stub.set(rpcSendRecv, ctgTestPrepareSTableMeta); + stub.set(rpcSendRecv, ctgTestRspMultiSTableMeta); { AddrAny any("libtransport.so"); std::map result; any.get_global_func_addr_dynsym("^rpcSendRecv$", result); for (const auto &f : result) { - stub.set(f.second, ctgTestPrepareSTableMeta); + stub.set(f.second, ctgTestRspMultiSTableMeta); } } } -void ctgTestSetPrepareMultiSTableMeta() { +void ctgTestSetRspByIdx() { static Stub stub; - stub.set(rpcSendRecv, ctgTestPrepareMultiSTableMeta); + stub.set(rpcSendRecv, ctgTestRspByIdx); { AddrAny any("libtransport.so"); std::map result; any.get_global_func_addr_dynsym("^rpcSendRecv$", result); for (const auto &f : result) { - stub.set(f.second, ctgTestPrepareMultiSTableMeta); + stub.set(f.second, ctgTestRspByIdx); } } } -void ctgTestSetPrepareDbVgroupsAndNormalMeta() { + +void ctgTestSetRspDbVgroupsAndNormalMeta() { static Stub stub; - stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndNormalMeta); + stub.set(rpcSendRecv, ctgTestRspDbVgroupsAndNormalMeta); { AddrAny any("libtransport.so"); std::map result; any.get_global_func_addr_dynsym("^rpcSendRecv$", result); for (const auto &f : result) { - stub.set(f.second, ctgTestPrepareDbVgroupsAndNormalMeta); + stub.set(f.second, ctgTestRspDbVgroupsAndNormalMeta); } } } -void ctgTestSetPrepareDbVgroupsAndChildMeta() { +void ctgTestSetRspDbVgroupsAndChildMeta() { static Stub stub; - stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndChildMeta); + stub.set(rpcSendRecv, ctgTestRspDbVgroupsAndChildMeta); { AddrAny any("libtransport.so"); std::map result; any.get_global_func_addr_dynsym("^rpcSendRecv$", result); for (const auto &f : result) { - stub.set(f.second, ctgTestPrepareDbVgroupsAndChildMeta); + stub.set(f.second, ctgTestRspDbVgroupsAndChildMeta); } } } -void ctgTestSetPrepareDbVgroupsAndSuperMeta() { +void ctgTestSetRspDbVgroupsAndSuperMeta() { static Stub stub; - stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndSuperMeta); + stub.set(rpcSendRecv, ctgTestRspDbVgroupsAndSuperMeta); { AddrAny any("libtransport.so"); std::map result; any.get_global_func_addr_dynsym("^rpcSendRecv$", result); for (const auto &f : result) { - stub.set(f.second, ctgTestPrepareDbVgroupsAndSuperMeta); + stub.set(f.second, ctgTestRspDbVgroupsAndSuperMeta); } } } -void ctgTestSetPrepareDbVgroupsAndMultiSuperMeta() { +void ctgTestSetRspDbVgroupsAndMultiSuperMeta() { static Stub stub; - stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndMultiSuperMeta); + stub.set(rpcSendRecv, ctgTestRspDbVgroupsAndMultiSuperMeta); { AddrAny any("libtransport.so"); std::map result; any.get_global_func_addr_dynsym("^rpcSendRecv$", result); for (const auto &f : result) { - stub.set(f.second, ctgTestPrepareDbVgroupsAndMultiSuperMeta); + stub.set(f.second, ctgTestRspDbVgroupsAndMultiSuperMeta); } } } @@ -719,17 +778,19 @@ void *ctgTestSetCtableMetaThread(void *param) { int32_t code = 0; SDBVgInfo dbVgroup = {0}; int32_t n = 0; - STableMetaOutput output = {0}; + STableMetaOutput *output = NULL; - ctgTestBuildCTableMetaOutput(&output); SCtgMetaAction action = {0}; action.act = CTG_ACT_UPDATE_TBL; while (!ctgTestStop) { + output = (STableMetaOutput *)malloc(sizeof(STableMetaOutput)); + ctgTestBuildCTableMetaOutput(output); + SCtgUpdateTblMsg *msg = (SCtgUpdateTblMsg *)malloc(sizeof(SCtgUpdateTblMsg)); msg->pCtg = pCtg; - msg->output = &output; + msg->output = output; action.data = msg; code = ctgActUpdateTbl(&action); @@ -745,8 +806,6 @@ void *ctgTestSetCtableMetaThread(void *param) { } } - tfree(output.tbMeta); - return NULL; } @@ -758,7 +817,7 @@ TEST(tableMeta, normalTable) { ctgTestInitLogFile(); - ctgTestSetPrepareDbVgroups(); + ctgTestSetRspDbVgroups(); initQueryModuleMsgHandle(); @@ -779,7 +838,11 @@ TEST(tableMeta, normalTable) { ASSERT_EQ(vgInfo.vgId, 8); ASSERT_EQ(vgInfo.epset.numOfEps, 3); - ctgTestSetPrepareTableMeta(); + while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM)) { + usleep(100); + } + + ctgTestSetRspTableMeta(); STableMeta *tableMeta = NULL; code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); @@ -793,6 +856,11 @@ TEST(tableMeta, normalTable) { ASSERT_EQ(tableMeta->tableInfo.precision, 1); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) { + usleep(100); + } + + tableMeta = NULL; code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); ASSERT_EQ(code, 0); @@ -841,6 +909,7 @@ TEST(tableMeta, normalTable) { ASSERT_EQ(allStbNum, 0); catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); } TEST(tableMeta, childTableCase) { @@ -850,7 +919,7 @@ TEST(tableMeta, childTableCase) { ctgTestInitLogFile(); - ctgTestSetPrepareDbVgroupsAndChildMeta(); + ctgTestSetRspDbVgroupsAndChildMeta(); initQueryModuleMsgHandle(); @@ -877,6 +946,10 @@ TEST(tableMeta, childTableCase) { ASSERT_EQ(tableMeta->tableInfo.precision, 1); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) { + usleep(100); + } + tableMeta = NULL; code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); ASSERT_EQ(code, 0); @@ -939,6 +1012,7 @@ TEST(tableMeta, childTableCase) { ASSERT_EQ(allStbNum, 1); catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); } TEST(tableMeta, superTableCase) { @@ -946,7 +1020,7 @@ TEST(tableMeta, superTableCase) { void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; - ctgTestSetPrepareDbVgroupsAndSuperMeta(); + ctgTestSetRspDbVgroupsAndSuperMeta(); initQueryModuleMsgHandle(); @@ -975,7 +1049,11 @@ TEST(tableMeta, superTableCase) { ASSERT_EQ(tableMeta->tableInfo.precision, 1); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); - ctgTestSetPrepareCTableMeta(); + while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) { + usleep(100); + } + + ctgTestSetRspCTableMeta(); tableMeta = NULL; @@ -992,6 +1070,10 @@ TEST(tableMeta, superTableCase) { ASSERT_EQ(tableMeta->tableInfo.precision, 1); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + while (2 != ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) { + usleep(100); + } + tableMeta = NULL; code = catalogRefreshGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0); ASSERT_EQ(code, 0); @@ -1041,6 +1123,7 @@ TEST(tableMeta, superTableCase) { ASSERT_EQ(allStbNum, 1); catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); } TEST(tableMeta, rmStbMeta) { @@ -1050,7 +1133,7 @@ TEST(tableMeta, rmStbMeta) { ctgTestInitLogFile(); - ctgTestSetPrepareDbVgroupsAndSuperMeta(); + ctgTestSetRspDbVgroupsAndSuperMeta(); initQueryModuleMsgHandle(); @@ -1079,9 +1162,17 @@ TEST(tableMeta, rmStbMeta) { ASSERT_EQ(tableMeta->tableInfo.precision, 1); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) { + usleep(100); + } + code = catalogRemoveStbMeta(pCtg, "1.db1", ctgTestDbId, ctgTestSTablename, ctgTestSuid); ASSERT_EQ(code, 0); + while (ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM) || ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM)) { + usleep(100); + } + ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), 1); ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), 0); ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), 0); @@ -1089,6 +1180,7 @@ TEST(tableMeta, rmStbMeta) { ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM), 0); catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); } TEST(tableMeta, updateStbMeta) { @@ -1098,7 +1190,7 @@ TEST(tableMeta, updateStbMeta) { ctgTestInitLogFile(); - ctgTestSetPrepareDbVgroupsAndSuperMeta(); + ctgTestSetRspDbVgroupsAndSuperMeta(); initQueryModuleMsgHandle(); @@ -1127,6 +1219,10 @@ TEST(tableMeta, updateStbMeta) { ASSERT_EQ(tableMeta->tableInfo.precision, 1); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) { + usleep(100); + } + tfree(tableMeta); STableMetaRsp rsp = {0}; @@ -1135,6 +1231,16 @@ TEST(tableMeta, updateStbMeta) { code = catalogUpdateSTableMeta(pCtg, &rsp); ASSERT_EQ(code, 0); + while (true) { + uint64_t n = 0; + ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n); + if (n != 3) { + usleep(100); + } else { + break; + } + } + ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), 1); ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), 1); ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), 1); @@ -1157,6 +1263,7 @@ TEST(tableMeta, updateStbMeta) { tfree(tableMeta); catalogDestroy(); + memset(&gCtgMgmt.stat, 0, sizeof(gCtgMgmt.stat)); } @@ -1167,7 +1274,15 @@ TEST(tableDistVgroup, normalTable) { SVgroupInfo *vgInfo = NULL; SArray *vgList = NULL; - ctgTestSetPrepareDbVgroupsAndNormalMeta(); + ctgTestInitLogFile(); + + memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc)); + ctgTestRspIdx = 0; + ctgTestRspFunc[0] = CTGT_RSP_VGINFO; + ctgTestRspFunc[1] = CTGT_RSP_TBMETA; + ctgTestRspFunc[2] = CTGT_RSP_VGINFO; + + ctgTestSetRspByIdx(); initQueryModuleMsgHandle(); @@ -1191,6 +1306,7 @@ TEST(tableDistVgroup, normalTable) { ASSERT_EQ(vgInfo->epset.numOfEps, 3); catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); } TEST(tableDistVgroup, childTableCase) { @@ -1199,7 +1315,16 @@ TEST(tableDistVgroup, childTableCase) { SVgroupInfo *vgInfo = NULL; SArray *vgList = NULL; - ctgTestSetPrepareDbVgroupsAndChildMeta(); + ctgTestInitLogFile(); + + memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc)); + ctgTestRspIdx = 0; + ctgTestRspFunc[0] = CTGT_RSP_VGINFO; + ctgTestRspFunc[1] = CTGT_RSP_CTBMETA; + ctgTestRspFunc[2] = CTGT_RSP_STBMETA; + ctgTestRspFunc[3] = CTGT_RSP_VGINFO; + + ctgTestSetRspByIdx(); initQueryModuleMsgHandle(); @@ -1223,6 +1348,7 @@ TEST(tableDistVgroup, childTableCase) { ASSERT_EQ(vgInfo->epset.numOfEps, 4); catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); } TEST(tableDistVgroup, superTableCase) { @@ -1231,7 +1357,18 @@ TEST(tableDistVgroup, superTableCase) { SVgroupInfo *vgInfo = NULL; SArray *vgList = NULL; - ctgTestSetPrepareDbVgroupsAndSuperMeta(); + ctgTestInitLogFile(); + + memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc)); + ctgTestRspIdx = 0; + ctgTestRspFunc[0] = CTGT_RSP_VGINFO; + ctgTestRspFunc[1] = CTGT_RSP_STBMETA; + ctgTestRspFunc[2] = CTGT_RSP_STBMETA; + ctgTestRspFunc[3] = CTGT_RSP_VGINFO; + + ctgTestSetRspByIdx(); + + initQueryModuleMsgHandle(); @@ -1260,6 +1397,7 @@ TEST(tableDistVgroup, superTableCase) { ASSERT_EQ(vgInfo->epset.numOfEps, 3); catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); } TEST(dbVgroup, getSetDbVgroupCase) { @@ -1272,7 +1410,14 @@ TEST(dbVgroup, getSetDbVgroupCase) { ctgTestInitLogFile(); - ctgTestSetPrepareDbVgroupsAndNormalMeta(); + memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc)); + ctgTestRspIdx = 0; + ctgTestRspFunc[0] = CTGT_RSP_VGINFO; + ctgTestRspFunc[1] = CTGT_RSP_TBMETA; + + + ctgTestSetRspByIdx(); + initQueryModuleMsgHandle(); @@ -1292,6 +1437,11 @@ TEST(dbVgroup, getSetDbVgroupCase) { ASSERT_EQ(code, 0); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum); + while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM)) { + usleep(100); + } + + code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); ASSERT_EQ(code, 0); ASSERT_EQ(vgInfo.vgId, 8); @@ -1309,6 +1459,17 @@ TEST(dbVgroup, getSetDbVgroupCase) { code = catalogUpdateDBVgInfo(pCtg, ctgTestDbname, ctgTestDbId, dbVgroup); ASSERT_EQ(code, 0); + while (true) { + uint64_t n = 0; + ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n); + if (n != 3) { + usleep(100); + } else { + break; + } + } + + code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); ASSERT_EQ(code, 0); ASSERT_EQ(vgInfo.vgId, 7); @@ -1323,6 +1484,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { taosArrayDestroy(vgList); catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); } TEST(multiThread, getSetRmSameDbVgroup) { @@ -1336,7 +1498,7 @@ TEST(multiThread, getSetRmSameDbVgroup) { ctgTestInitLogFile(); - ctgTestSetPrepareDbVgroups(); + ctgTestSetRspDbVgroups(); initQueryModuleMsgHandle(); @@ -1374,6 +1536,7 @@ TEST(multiThread, getSetRmSameDbVgroup) { sleep(1); catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); } TEST(multiThread, getSetRmDiffDbVgroup) { @@ -1387,7 +1550,7 @@ TEST(multiThread, getSetRmDiffDbVgroup) { ctgTestInitLogFile(); - ctgTestSetPrepareDbVgroups(); + ctgTestSetRspDbVgroups(); initQueryModuleMsgHandle(); @@ -1425,6 +1588,7 @@ TEST(multiThread, getSetRmDiffDbVgroup) { sleep(1); catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); } @@ -1440,7 +1604,7 @@ TEST(multiThread, ctableMeta) { ctgTestInitLogFile(); - ctgTestSetPrepareDbVgroupsAndChildMeta(); + ctgTestSetRspDbVgroupsAndChildMeta(); initQueryModuleMsgHandle(); @@ -1477,6 +1641,7 @@ TEST(multiThread, ctableMeta) { sleep(2); catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); } @@ -1495,7 +1660,7 @@ TEST(rentTest, allRent) { ctgTestInitLogFile(); - ctgTestSetPrepareDbVgroupsAndMultiSuperMeta(); + ctgTestSetRspDbVgroupsAndMultiSuperMeta(); initQueryModuleMsgHandle(); @@ -1525,6 +1690,10 @@ TEST(rentTest, allRent) { ASSERT_EQ(tableMeta->tableInfo.precision, 1); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + while (ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM) < i) { + usleep(100); + } + code = catalogGetExpiredDBs(pCtg, &dbs, &num); ASSERT_EQ(code, 0); printf("%d - expired dbNum:%d\n", i, num); @@ -1550,6 +1719,7 @@ TEST(rentTest, allRent) { } catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); } int main(int argc, char **argv) { -- GitLab