From 09e6462c5fad9a5612ae64e96b93898e2fbe25a9 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 3 Mar 2022 09:09:11 +0800 Subject: [PATCH] feature/qnode --- include/common/tmsg.h | 4 + source/common/src/tmsg.c | 6 + source/dnode/mgmt/impl/src/dndVnodes.c | 3 + source/dnode/mnode/impl/src/mndVgroup.c | 3 + source/dnode/vnode/inc/vnode.h | 3 + source/dnode/vnode/src/vnd/vnodeMain.c | 3 + source/libs/catalog/inc/catalogInt.h | 41 +- source/libs/catalog/src/catalog.c | 305 ++++++++---- source/libs/catalog/test/catalogTests.cpp | 580 ++++++++++++++++++++-- 9 files changed, 812 insertions(+), 136 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ab0472a575..25d668e788 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -739,6 +739,9 @@ typedef struct { int32_t maxRows; int32_t commitTime; int32_t fsyncPeriod; + uint32_t hashBegin; + uint32_t hashEnd; + int8_t hashMethod; int8_t walLevel; int8_t precision; int8_t compression; @@ -749,6 +752,7 @@ typedef struct { int8_t selfIndex; int8_t streamMode; SReplica replicas[TSDB_MAX_REPLICA]; + } SCreateVnodeReq, SAlterVnodeReq; int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index d8c850c6af..ae31dff310 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2112,6 +2112,9 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR if (tEncodeI32(&encoder, pReq->maxRows) < 0) return -1; if (tEncodeI32(&encoder, pReq->commitTime) < 0) return -1; if (tEncodeI32(&encoder, pReq->fsyncPeriod) < 0) return -1; + if (tEncodeU32(&encoder, pReq->hashBegin) < 0) return -1; + if (tEncodeU32(&encoder, pReq->hashEnd) < 0) return -1; + if (tEncodeI8(&encoder, pReq->hashMethod) < 0) return -1; if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1; if (tEncodeI8(&encoder, pReq->precision) < 0) return -1; if (tEncodeI8(&encoder, pReq->compression) < 0) return -1; @@ -2152,6 +2155,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * if (tDecodeI32(&decoder, &pReq->maxRows) < 0) return -1; if (tDecodeI32(&decoder, &pReq->commitTime) < 0) return -1; if (tDecodeI32(&decoder, &pReq->fsyncPeriod) < 0) return -1; + if (tDecodeU32(&decoder, &pReq->hashBegin) < 0) return -1; + if (tDecodeU32(&decoder, &pReq->hashEnd) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->hashMethod) < 0) return -1; if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1; if (tDecodeI8(&decoder, &pReq->precision) < 0) return -1; if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1; diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index f20493aa7f..2767461f12 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -523,6 +523,9 @@ static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->walCfg.rollPeriod = 128; pCfg->walCfg.segSize = 128; pCfg->walCfg.vgId = pCreate->vgId; + pCfg->hashBegin = pCreate->hashBegin; + pCfg->hashEnd = pCreate->hashEnd; + pCfg->hashMethod = pCreate->hashMethod; } static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index b437b44417..f7b177f170 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -215,6 +215,9 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg createReq.replica = pVgroup->replica; createReq.selfIndex = -1; createReq.streamMode = pVgroup->streamMode; + createReq.hashBegin = pVgroup->hashBegin; + createReq.hashEnd = pVgroup->hashEnd; + createReq.hashMethod = pDb->hashMethod; for (int32_t v = 0; v < pVgroup->replica; ++v) { SReplica *pReplica = &createReq.replicas[v]; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 31f04e840a..9a4f920499 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -57,6 +57,9 @@ typedef struct { SMetaCfg metaCfg; STqCfg tqCfg; SWalCfg walCfg; + uint32_t hashBegin; + uint32_t hashEnd; + int8_t hashMethod; } SVnodeCfg; typedef struct { diff --git a/source/dnode/vnode/src/vnd/vnodeMain.c b/source/dnode/vnode/src/vnd/vnodeMain.c index c748907d6c..ba346064ae 100644 --- a/source/dnode/vnode/src/vnd/vnodeMain.c +++ b/source/dnode/vnode/src/vnd/vnodeMain.c @@ -30,6 +30,9 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { cfg.pDnode = pVnodeCfg->pDnode; cfg.pTfs = pVnodeCfg->pTfs; cfg.dbId = pVnodeCfg->dbId; + cfg.hashBegin = pVnodeCfg->hashBegin; + cfg.hashEnd = pVnodeCfg->hashEnd; + cfg.hashMethod = pVnodeCfg->hashMethod; } // Validate options diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index e71f559ad9..c4f1a117fe 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -60,6 +60,7 @@ typedef struct SCtgDebug { bool lockDebug; bool cacheDebug; bool apiDebug; + bool metaDebug; uint32_t showCachePeriodSec; } SCtgDebug; @@ -119,6 +120,10 @@ typedef struct SCatalogStat { SCtgCacheStat cache; } SCatalogStat; +typedef struct SCtgUpdateMsgHeader { + SCatalog* pCtg; +} SCtgUpdateMsgHeader; + typedef struct SCtgUpdateVgMsg { SCatalog* pCtg; char dbFName[TSDB_DB_FNAME_LEN]; @@ -145,6 +150,14 @@ typedef struct SCtgRemoveStbMsg { uint64_t suid; } SCtgRemoveStbMsg; +typedef struct SCtgRemoveTblMsg { + SCatalog* pCtg; + char dbFName[TSDB_DB_FNAME_LEN]; + char tbName[TSDB_TABLE_NAME_LEN]; + uint64_t dbId; +} SCtgRemoveTblMsg; + + typedef struct SCtgMetaAction { int32_t act; void *data; @@ -189,19 +202,21 @@ typedef struct SCtgAction { #define CTG_IS_META_TABLE(type) ((type) == META_TYPE_TABLE) #define CTG_IS_META_BOTH(type) ((type) == META_TYPE_BOTH_TABLE) -#define CTG_FLAG_STB 0x1 -#define CTG_FLAG_NOT_STB 0x2 -#define CTG_FLAG_UNKNOWN_STB 0x4 -#define CTG_FLAG_INF_DB 0x8 - -#define CTG_IS_STB(_flag) ((_flag) & CTG_FLAG_STB) -#define CTG_IS_NOT_STB(_flag) ((_flag) & CTG_FLAG_NOT_STB) -#define CTG_IS_UNKNOWN_STB(_flag) ((_flag) & CTG_FLAG_UNKNOWN_STB) -#define CTG_IS_INF_DB(_flag) ((_flag) & CTG_FLAG_INF_DB) -#define CTG_SET_INF_DB(_flag) ((_flag) |= CTG_FLAG_INF_DB) -#define CTG_SET_STB(_flag, tbType) do { (_flag) |= ((tbType) == TSDB_SUPER_TABLE) ? CTG_FLAG_STB : ((tbType) > TSDB_SUPER_TABLE ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB); } while (0) -#define CTG_GEN_STB_FLAG(_isStb) ((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB) -#define CTG_TBTYPE_MATCH(_flag, tbType) (CTG_IS_UNKNOWN_STB(_flag) || (CTG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE)) +#define CTG_FLAG_STB 0x1 +#define CTG_FLAG_NOT_STB 0x2 +#define CTG_FLAG_UNKNOWN_STB 0x4 +#define CTG_FLAG_INF_DB 0x8 +#define CTG_FLAG_FORCE_UPDATE 0x10 + +#define CTG_FLAG_IS_STB(_flag) ((_flag) & CTG_FLAG_STB) +#define CTG_FLAG_IS_NOT_STB(_flag) ((_flag) & CTG_FLAG_NOT_STB) +#define CTG_FLAG_IS_UNKNOWN_STB(_flag) ((_flag) & CTG_FLAG_UNKNOWN_STB) +#define CTG_FLAG_IS_INF_DB(_flag) ((_flag) & CTG_FLAG_INF_DB) +#define CTG_FLAG_IS_FORCE_UPDATE(_flag) ((_flag) & CTG_FLAG_FORCE_UPDATE) +#define CTG_FLAG_SET_INF_DB(_flag) ((_flag) |= CTG_FLAG_INF_DB) +#define CTG_FLAG_SET_STB(_flag, tbType) do { (_flag) |= ((tbType) == TSDB_SUPER_TABLE) ? CTG_FLAG_STB : ((tbType) > TSDB_SUPER_TABLE ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB); } while (0) +#define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB)) +#define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE)) #define CTG_IS_INF_DBNAME(_dbname) ((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB))) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index f0ea51c2f9..5779906761 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -72,6 +72,12 @@ int32_t ctgDbgEnableDebug(char *option) { return TSDB_CODE_SUCCESS; } + if (0 == strcasecmp(option, "meta")) { + gCTGDebug.metaDebug = true; + qDebug("api debug enabled"); + return TSDB_CODE_SUCCESS; + } + qError("invalid debug option:%s", option); return TSDB_CODE_CTG_INTERNAL_ERROR; @@ -148,9 +154,30 @@ int32_t ctgDbgGetClusterCacheNum(SCatalog* pCtg, int32_t type) { return num; } +void ctgDbgShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p) { + if (!gCTGDebug.metaDebug) { + return; + } + + STableComInfo *c = &p->tableInfo; + + if (TSDB_CHILD_TABLE == p->tableType) { + ctgDebug("table [%s] meta: type:%d, vgId:%d, uid:%" PRIx64 ",suid:%" PRIx64, tbName, p->tableType, p->vgId, p->uid, p->suid); + return; + } else { + ctgDebug("table [%s] meta: type:%d, vgId:%d, uid:%" PRIx64 ",suid:%" PRIx64 ",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d", + tbName, p->tableType, p->vgId, p->uid, p->suid, p->sversion, p->tversion, c->numOfTags, c->precision, c->numOfColumns, c->rowSize); + } + + int32_t colNum = c->numOfColumns + c->numOfTags; + for (int32_t i = 0; i < colNum; ++i) { + SSchema *s = &p->schema[i]; + ctgDebug("[%d] name:%s, type:%d, colId:%d, bytes:%d", i, s->name, s->type, s->colId, s->bytes); + } +} -void ctgDbgShowDBCache(SHashObj *dbHash) { - if (NULL == dbHash) { +void ctgDbgShowDBCache(SCatalog* pCtg, SHashObj *dbHash) { + if (NULL == dbHash || !gCTGDebug.cacheDebug) { return; } @@ -164,31 +191,24 @@ void ctgDbgShowDBCache(SHashObj *dbHash) { dbCache = (SCtgDBCache *)pIter; taosHashGetKey(dbCache, (void **)&dbFName, &len); - - CTG_CACHE_DEBUG("** %dth db [%.*s][%"PRIx64"] **", i, (int32_t)len, dbFName, dbCache->dbId); - CTG_CACHE_DEBUG("deleted: %d", dbCache->deleted); + int32_t metaNum = dbCache->tbCache.metaCache ? taosHashGetSize(dbCache->tbCache.metaCache) : 0; + int32_t stbNum = dbCache->tbCache.stbCache ? taosHashGetSize(dbCache->tbCache.stbCache) : 0; + int32_t vgVersion = CTG_DEFAULT_INVALID_VERSION; + int32_t hashMethod = -1; + int32_t vgNum = 0; + if (dbCache->vgInfo) { - CTG_CACHE_DEBUG("vgVersion: %d", dbCache->vgInfo->vgVersion); - CTG_CACHE_DEBUG("hashMethod: %d", dbCache->vgInfo->hashMethod); + vgVersion = dbCache->vgInfo->vgVersion; + hashMethod = dbCache->vgInfo->hashMethod; if (dbCache->vgInfo->vgHash) { - CTG_CACHE_DEBUG("vgNum: %d", taosHashGetSize(dbCache->vgInfo->vgHash)); - //TODO - } else { - CTG_CACHE_DEBUG("vgHash: %p", dbCache->vgInfo->vgHash); + vgNum = taosHashGetSize(dbCache->vgInfo->vgHash); } - } else { - CTG_CACHE_DEBUG("vgInfo: %p", dbCache->vgInfo); - } - - if (dbCache->tbCache.metaCache) { - CTG_CACHE_DEBUG("metaNum: %d", taosHashGetSize(dbCache->tbCache.metaCache)); - } - - if (dbCache->tbCache.stbCache) { - CTG_CACHE_DEBUG("stbNum: %d", taosHashGetSize(dbCache->tbCache.stbCache)); - } + } + ctgDebug("[%d] db [%.*s][%"PRIx64"] %s: metaNum:%d, stbNum:%d, vgVersion:%d, hashMethod:%d, vgNum:%d", + i, (int32_t)len, dbFName, dbCache->dbId, dbCache->deleted?"deleted":"", metaNum, stbNum, vgVersion, hashMethod, vgNum); + pIter = taosHashIterate(dbHash, pIter); } } @@ -197,15 +217,15 @@ void ctgDbgShowDBCache(SHashObj *dbHash) { void ctgDbgShowClusterCache(SCatalog* pCtg) { - if (NULL == pCtg) { + if (!gCTGDebug.cacheDebug || NULL == pCtg) { return; } - CTG_CACHE_DEBUG("## cluster %"PRIx64" %p cache Info ##", pCtg->clusterId, pCtg); - CTG_CACHE_DEBUG("db:%d meta:%d stb:%d dbRent:%d stbRent:%d", ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), + ctgDebug("## cluster %"PRIx64" %p cache Info ##", pCtg->clusterId, pCtg); + ctgDebug("db:%d meta:%d stb:%d dbRent:%d stbRent:%d", ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM)); - ctgDbgShowDBCache(pCtg->dbCache); + ctgDbgShowDBCache(pCtg, pCtg->dbCache); } @@ -292,6 +312,66 @@ _return: } +int32_t ctgPushRmStbMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid) { + int32_t code = 0; + SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB}; + SCtgRemoveStbMsg *msg = malloc(sizeof(SCtgRemoveStbMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + msg->pCtg = pCtg; + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + strncpy(msg->stbName, stbName, sizeof(msg->stbName)); + msg->dbId = dbId; + msg->suid = suid; + + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(&action)); + + ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); + + return TSDB_CODE_SUCCESS; + +_return: + + tfree(action.data); + CTG_RET(code); +} + + + +int32_t ctgPushRmTblMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName) { + int32_t code = 0; + SCtgMetaAction action= {.act = CTG_ACT_REMOVE_TBL}; + SCtgRemoveTblMsg *msg = malloc(sizeof(SCtgRemoveTblMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveTblMsg)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + msg->pCtg = pCtg; + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + strncpy(msg->tbName, tbName, sizeof(msg->tbName)); + msg->dbId = dbId; + + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(&action)); + + ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); + + return TSDB_CODE_SUCCESS; + +_return: + + tfree(action.data); + CTG_RET(code); +} + + void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) { CTG_LOCK(CTG_WRITE, &cache->stbLock); if (cache->stbCache) { @@ -554,7 +634,7 @@ int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, } -int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist, int32_t flag) { +int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist, int32_t flag, uint64_t *dbId) { if (NULL == pCtg->dbCache) { *exist = 0; ctgWarn("empty tbmeta cache, tbName:%s", pTableName->tname); @@ -562,7 +642,7 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable } char dbFName[TSDB_DB_FNAME_LEN] = {0}; - if (CTG_IS_INF_DB(flag)) { + if (CTG_FLAG_IS_INF_DB(flag)) { strcpy(dbFName, pTableName->dbname); } else { tNameGetFullDbName(pTableName, dbFName); @@ -590,6 +670,9 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable } *exist = 1; + if (dbId) { + *dbId = dbCache->dbId; + } tbMeta = *pTableMeta; @@ -646,7 +729,7 @@ int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_ } char dbFName[TSDB_DB_FNAME_LEN] = {0}; - if (CTG_IS_INF_DB(flag)) { + if (CTG_FLAG_IS_INF_DB(flag)) { strcpy(dbFName, pTableName->dbname); } else { tNameGetFullDbName(pTableName, dbFName); @@ -1304,18 +1387,21 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui STableMeta *orig = taosHashGet(tbCache->metaCache, tbName, strlen(tbName)); if (orig) { origType = orig->tableType; - origSuid = orig->suid; - if (origType == TSDB_SUPER_TABLE && ((!isStb) || origSuid != meta->suid)) { - CTG_LOCK(CTG_WRITE, &tbCache->stbLock); - if (taosHashRemove(tbCache->stbCache, &orig->suid, sizeof(orig->suid))) { - ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid); + if (origType == TSDB_SUPER_TABLE) { + if ((!isStb) || orig->suid != meta->suid) { + CTG_LOCK(CTG_WRITE, &tbCache->stbLock); + if (taosHashRemove(tbCache->stbCache, &orig->suid, sizeof(orig->suid))) { + ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid); + } + CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); + + ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid); + + ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgStbVersionCompare); } - CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); - ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid); - - ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgStbVersionCompare); + origSuid = orig->suid; } } @@ -1334,13 +1420,14 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui } ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType); + ctgDbgShowTableMeta(pCtg, tbName, meta); if (!isStb) { CTG_UNLOCK(CTG_READ, &tbCache->metaLock); return TSDB_CODE_SUCCESS; } - if (isStb && origSuid == meta->suid) { + if (origType == TSDB_SUPER_TABLE && origSuid == meta->suid) { CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); CTG_UNLOCK(CTG_READ, &tbCache->metaLock); return TSDB_CODE_SUCCESS; @@ -1506,7 +1593,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SVgroupInfo vgroupInfo = {0}; int32_t code = 0; - if (!CTG_IS_INF_DB(flag)) { + if (!CTG_FLAG_IS_INF_DB(flag)) { CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pTrans, pMgmtEps, pTableName, &vgroupInfo)); } @@ -1518,11 +1605,11 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } - if (CTG_IS_INF_DB(flag)) { + if (CTG_FLAG_IS_INF_DB(flag)) { ctgDebug("will refresh tbmeta, supposed in information_schema, tbName:%s", tNameGetTableName(pTableName)); CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, (char *)pTableName->dbname, (char *)pTableName->tname, output)); - } else if (CTG_IS_STB(flag)) { + } else if (CTG_FLAG_IS_STB(flag)) { ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pTableName)); // if get from mnode failed, will not try vnode @@ -1538,14 +1625,17 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCtg, pTrans, pMgmtEps, pTableName, &vgroupInfo, output)); if (CTG_IS_META_TABLE(output->metaType) && TSDB_SUPER_TABLE == output->tbMeta->tableType) { - ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s, metaType:%d", tNameGetTableName(pTableName), output->metaType); + ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pTableName)); tfree(output->tbMeta); CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, output->dbFName, output->tbName, output)); } else if (CTG_IS_META_BOTH(output->metaType)) { int32_t exist = 0; - CTG_ERR_JRET(ctgIsTableMetaExistInCache(pCtg, output->dbFName, output->tbName, &exist)); + if (!CTG_FLAG_IS_FORCE_UPDATE(flag)) { + CTG_ERR_JRET(ctgIsTableMetaExistInCache(pCtg, output->dbFName, output->tbName, &exist)); + } + if (0 == exist) { CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, output->dbFName, output->tbName, &moutput)); @@ -1606,35 +1696,40 @@ _return: CTG_RET(code); } -int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t flag) { +int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t flag) { if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } int32_t exist = 0; int32_t code = 0; + uint64_t dbId = 0; + uint64_t suid = 0; + STableMetaOutput *output = NULL; if (CTG_IS_INF_DBNAME(pTableName->dbname)) { - CTG_SET_INF_DB(flag); + CTG_FLAG_SET_INF_DB(flag); } - if ((!forceUpdate) || (CTG_IS_INF_DB(flag))) { - CTG_ERR_RET(ctgGetTableMetaFromCache(pCtg, pTableName, pTableMeta, &exist, flag)); + CTG_ERR_RET(ctgGetTableMetaFromCache(pCtg, pTableName, pTableMeta, &exist, flag, &dbId)); - if (exist && CTG_TBTYPE_MATCH(flag, (*pTableMeta)->tableType)) { - return TSDB_CODE_SUCCESS; + int32_t tbType = 0; + + if (exist) { + if (CTG_FLAG_MATCH_STB(flag, (*pTableMeta)->tableType) && ((!CTG_FLAG_IS_FORCE_UPDATE(flag)) || (CTG_FLAG_IS_INF_DB(flag)))) { + goto _return; } - tfree(*pTableMeta); - } else if (CTG_IS_UNKNOWN_STB(flag)) { - int32_t tbType = 0; - - CTG_ERR_RET(ctgGetTableTypeFromCache(pCtg, pTableName, &tbType, flag)); + tbType = (*pTableMeta)->tableType; + suid = (*pTableMeta)->suid; - CTG_SET_STB(flag, tbType); + tfree(*pTableMeta); + } + + if (CTG_FLAG_IS_UNKNOWN_STB(flag)) { + CTG_FLAG_SET_STB(flag, tbType); } - STableMetaOutput *output = NULL; while (true) { CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, flag, &output)); @@ -1662,7 +1757,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons SName stbName = *pTableName; strcpy(stbName.tname, output->tbName); - CTG_ERR_JRET(ctgGetTableMetaFromCache(pCtg, &stbName, pTableMeta, &exist, flag)); + CTG_ERR_JRET(ctgGetTableMetaFromCache(pCtg, &stbName, pTableMeta, &exist, flag, NULL)); if (0 == exist) { ctgDebug("stb no longer exist, dbFName:%s, tbName:%s", output->dbFName, pTableName->tname); continue; @@ -1675,10 +1770,26 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons _return: + if (CTG_TABLE_NOT_EXIST(code) && exist) { + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + if (CTG_FLAG_IS_INF_DB(flag)) { + strcpy(dbFName, pTableName->dbname); + } else { + tNameGetFullDbName(pTableName, dbFName); + } + + if (TSDB_SUPER_TABLE == tbType) { + ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, suid); + } else { + ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname); + } + } + tfree(output); if (*pTableMeta) { ctgDebug("tbmeta returned, tbName:%s, tbType:%d", pTableName->tname, (*pTableMeta)->tableType); + ctgDbgShowTableMeta(pCtg, pTableName->tname, *pTableMeta); } CTG_RET(code); @@ -1694,7 +1805,7 @@ int32_t ctgActUpdateVg(SCtgMetaAction *action) { _return: - tfree(msg->dbInfo); + ctgFreeVgInfo(msg->dbInfo); tfree(msg); CTG_RET(code); @@ -1780,7 +1891,6 @@ _return: int32_t ctgActRemoveStb(SCtgMetaAction *action) { int32_t code = 0; SCtgRemoveStbMsg *msg = action->data; - bool removed = false; SCatalog* pCtg = msg->pCtg; SCtgDBCache *dbCache = NULL; @@ -1826,7 +1936,36 @@ _return: } int32_t ctgActRemoveTbl(SCtgMetaAction *action) { + int32_t code = 0; + SCtgRemoveTblMsg *msg = action->data; + SCatalog* pCtg = msg->pCtg; + + SCtgDBCache *dbCache = NULL; + ctgGetDBCache(pCtg, msg->dbFName, &dbCache); + if (NULL == dbCache) { + return TSDB_CODE_SUCCESS; + } + + if (dbCache->dbId != msg->dbId) { + ctgDebug("dbId already modified, dbFName:%s, current:%"PRIx64", dbId:%"PRIx64", tbName:%s", msg->dbFName, dbCache->dbId, msg->dbId, msg->tbName); + return TSDB_CODE_SUCCESS; + } + + CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); + if (taosHashRemove(dbCache->tbCache.metaCache, msg->tbName, strlen(msg->tbName))) { + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); + ctgError("stb not exist in cache, dbFName:%s, tbName:%s", msg->dbFName, msg->tbName); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); + + ctgInfo("table removed from cache, dbFName:%s, tbName:%s", msg->dbFName, msg->tbName); + +_return: + tfree(msg); + + CTG_RET(code); } @@ -1846,12 +1985,15 @@ void* ctgUpdateThreadFunc(void* param) { SCtgMetaAction *action = NULL; ctgPopAction(&action); + SCatalog *pCtg = ((SCtgUpdateMsgHeader *)action->data)->pCtg; - qDebug("process %s action", gCtgAction[action->act].name); + ctgDebug("process [%s] action", gCtgAction[action->act].name); (*gCtgAction[action->act].func)(action); CTG_STAT_ADD(gCtgMgmt.stat.runtime.qDoneNum); + + ctgDbgShowClusterCache(pCtg); } CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); @@ -2121,22 +2263,20 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); msg->dbId = dbId; msg->dbInfo = dbInfo; - dbInfo = NULL; action.data = msg; CTG_ERR_JRET(ctgPushAction(&action)); + dbInfo = NULL; + ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); CTG_API_LEAVE(code); _return: - if (dbInfo) { - taosHashCleanup(dbInfo->vgHash); - tfree(dbInfo); - } + ctgFreeVgInfo(dbInfo); tfree(msg); @@ -2179,31 +2319,12 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, CTG_API_LEAVE(TSDB_CODE_SUCCESS); } - SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB}; - SCtgRemoveStbMsg *msg = malloc(sizeof(SCtgRemoveStbMsg)); - if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg)); - CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR); - } - - msg->pCtg = pCtg; - strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); - strncpy(msg->stbName, stbName, sizeof(msg->stbName)); - msg->dbId = dbId; - msg->suid = suid; - - action.data = msg; - - CTG_ERR_JRET(ctgPushAction(&action)); - - ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); + CTG_ERR_JRET(ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, stbName, suid)); CTG_API_LEAVE(TSDB_CODE_SUCCESS); _return: - tfree(action.data); - CTG_API_LEAVE(code); } @@ -2211,13 +2332,13 @@ _return: int32_t catalogGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { CTG_API_ENTER(); - CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, false, pTableMeta, CTG_FLAG_UNKNOWN_STB)); + CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, pTableMeta, CTG_FLAG_UNKNOWN_STB)); } int32_t catalogGetSTableMeta(SCatalog* pCtg, void * pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { CTG_API_ENTER(); - CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, false, pTableMeta, CTG_FLAG_STB)); + CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, pTableMeta, CTG_FLAG_STB)); } int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) { @@ -2279,13 +2400,13 @@ int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTrans, pMgmtEps, pTableName, CTG_GEN_STB_FLAG(isSTable), NULL)); + CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTrans, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable), NULL)); } int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) { CTG_API_ENTER(); - CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, true, pTableMeta, CTG_GEN_STB_FLAG(isSTable))); + CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, pTableMeta, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable))); } int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgList) { @@ -2309,7 +2430,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm *pVgList = NULL; - CTG_ERR_JRET(ctgGetTableMeta(pCtg, pRpc, pMgmtEps, pTableName, false, &tbMeta, CTG_FLAG_UNKNOWN_STB)); + CTG_ERR_JRET(ctgGetTableMeta(pCtg, pRpc, pMgmtEps, pTableName, &tbMeta, CTG_FLAG_UNKNOWN_STB)); char db[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(pTableName, db); @@ -2441,7 +2562,7 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SName *name = taosArrayGet(pReq->pTableName, i); STableMeta *pTableMeta = NULL; - CTG_ERR_JRET(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, name, false, &pTableMeta, CTG_FLAG_UNKNOWN_STB)); + CTG_ERR_JRET(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, name, &pTableMeta, CTG_FLAG_UNKNOWN_STB)); if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) { ctgError("taosArrayPush failed, idx:%d", i); diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index b7432429f4..b417a645be 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -38,7 +38,7 @@ namespace { extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog *pCatalog, const SName *pTableName, STableMeta **pTableMeta, - int32_t *exist, int32_t flag); + int32_t *exist, int32_t flag, uint64_t *dbId); extern "C" int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type); extern "C" int32_t ctgActUpdateTbl(SCtgMetaAction *action); extern "C" int32_t ctgDbgEnableDebug(char *option); @@ -57,12 +57,14 @@ enum { CTGT_RSP_CTBMETA, CTGT_RSP_STBMETA, CTGT_RSP_MSTBMETA, + CTGT_RSP_TBMETA_NOT_EXIST, }; bool ctgTestStop = false; bool ctgTestEnableSleep = false; +bool ctgTestEnableLog = true; bool ctgTestDeadLoop = false; -int32_t ctgTestPrintNum = 200000; +int32_t ctgTestPrintNum = 10000; int32_t ctgTestMTRunSec = 5; int32_t ctgTestCurrentVgVersion = 0; @@ -74,14 +76,18 @@ int32_t ctgTestSVersion = 1; int32_t ctgTestTVersion = 1; int32_t ctgTestSuid = 2; uint64_t ctgTestDbId = 33; +uint64_t ctgTestNormalTblUid = 1; uint64_t ctgTestClusterId = 0x1; char *ctgTestDbname = "1.db1"; char *ctgTestTablename = "table1"; char *ctgTestCTablename = "ctable1"; char *ctgTestSTablename = "stable1"; +char *ctgTestCurrentCTableName = NULL; +char *ctgTestCurrentTableName = NULL; +char *ctgTestCurrentSTableName = NULL; -int32_t ctgTestRspFunc[10] = {0}; +int32_t ctgTestRspFunc[100] = {0}; int32_t ctgTestRspIdx = 0; void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { @@ -123,6 +129,10 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { } void ctgTestInitLogFile() { + if (!ctgTestEnableLog) { + return; + } + const char *defaultLogFileNamePrefix = "taoslog"; const int32_t maxLogFileNum = 10; @@ -131,6 +141,8 @@ void ctgTestInitLogFile() { strcpy(tsLogDir, "/var/log/taos"); ctgDbgEnableDebug("api"); + ctgDbgEnableDebug("meta"); + ctgDbgEnableDebug("cache"); if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { printf("failed to open log file in directory:%s\n", tsLogDir); @@ -321,7 +333,7 @@ void ctgTestRspTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg * metaRsp.sversion = ctgTestSVersion; metaRsp.tversion = ctgTestTVersion; metaRsp.suid = 0; - metaRsp.tuid = 0x0000000000000001; + metaRsp.tuid = ctgTestNormalTblUid++; metaRsp.vgId = 8; metaRsp.pSchemas = (SSchema *)malloc((metaRsp.numOfTags + metaRsp.numOfColumns) * sizeof(SSchema)); @@ -349,10 +361,15 @@ void ctgTestRspTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg * tFreeSTableMetaRsp(&metaRsp); } +void ctgTestRspTableMetaNotExist(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + pRsp->code = CTG_ERR_CODE_TABLE_NOT_EXIST; +} + + void ctgTestRspCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { STableMetaRsp metaRsp = {0}; strcpy(metaRsp.dbFName, ctgTestDbname); - strcpy(metaRsp.tbName, ctgTestCTablename); + strcpy(metaRsp.tbName, ctgTestCurrentCTableName ? ctgTestCurrentCTableName : ctgTestCTablename); strcpy(metaRsp.stbName, ctgTestSTablename); metaRsp.numOfTags = ctgTestTagNum; metaRsp.numOfColumns = ctgTestColNum; @@ -399,7 +416,7 @@ void ctgTestRspCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg void ctgTestRspSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { STableMetaRsp metaRsp = {0}; strcpy(metaRsp.dbFName, ctgTestDbname); - strcpy(metaRsp.tbName, ctgTestSTablename); + strcpy(metaRsp.tbName, ctgTestCurrentSTableName ? ctgTestCurrentSTableName : ctgTestSTablename); strcpy(metaRsp.stbName, ctgTestSTablename); metaRsp.numOfTags = ctgTestTagNum; metaRsp.numOfColumns = ctgTestColNum; @@ -409,7 +426,7 @@ void ctgTestRspSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg metaRsp.sversion = ctgTestSVersion; metaRsp.tversion = ctgTestTVersion; metaRsp.suid = ctgTestSuid; - metaRsp.tuid = ctgTestSuid; + metaRsp.tuid = ctgTestSuid++; metaRsp.vgId = 0; metaRsp.pSchemas = (SSchema *)malloc((metaRsp.numOfTags + metaRsp.numOfColumns) * sizeof(SSchema)); @@ -511,6 +528,9 @@ void ctgTestRspByIdx(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp case CTGT_RSP_MSTBMETA: ctgTestRspMultiSTableMeta(shandle, pEpSet, pMsg, pRsp); break; + case CTGT_RSP_TBMETA_NOT_EXIST: + ctgTestRspTableMetaNotExist(shandle, pEpSet, pMsg, pRsp); + break; default: break; } @@ -773,7 +793,7 @@ void *ctgTestGetCtableMetaThread(void *param) { strcpy(cn.tname, ctgTestCTablename); while (!ctgTestStop) { - code = ctgGetTableMetaFromCache(pCtg, &cn, &tbMeta, &exist, 0); + code = ctgGetTableMetaFromCache(pCtg, &cn, &tbMeta, &exist, 0, NULL); if (code || 0 == exist) { assert(0); } @@ -828,7 +848,7 @@ void *ctgTestSetCtableMetaThread(void *param) { return NULL; } -#if 0 +#if 1 TEST(tableMeta, normalTable) { @@ -860,7 +880,7 @@ TEST(tableMeta, normalTable) { ASSERT_EQ(vgInfo.epset.numOfEps, 3); while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM)) { - usleep(10000); + usleep(50000); } ctgTestSetRspTableMeta(); @@ -870,6 +890,7 @@ TEST(tableMeta, normalTable) { ASSERT_EQ(code, 0); ASSERT_EQ(tableMeta->vgId, 8); ASSERT_EQ(tableMeta->tableType, TSDB_NORMAL_TABLE); + ASSERT_EQ(tableMeta->uid, ctgTestNormalTblUid - 1); ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); @@ -880,7 +901,7 @@ TEST(tableMeta, normalTable) { while (true) { uint32_t n = ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM); if (0 == n) { - usleep(10000); + usleep(50000); } else { break; } @@ -975,7 +996,7 @@ TEST(tableMeta, childTableCase) { while (true) { uint32_t n = ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM); if (0 == n) { - usleep(10000); + usleep(50000); } else { break; } @@ -994,7 +1015,7 @@ TEST(tableMeta, childTableCase) { ASSERT_EQ(tableMeta->tableInfo.precision, 1); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); - tableMeta = NULL; + tfree(tableMeta); strcpy(n.tname, ctgTestSTablename); code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); @@ -1074,8 +1095,8 @@ TEST(tableMeta, superTableCase) { ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE); ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); - ASSERT_EQ(tableMeta->uid, ctgTestSuid); - ASSERT_EQ(tableMeta->suid, ctgTestSuid); + ASSERT_EQ(tableMeta->uid, ctgTestSuid - 1); + ASSERT_EQ(tableMeta->suid, ctgTestSuid - 1); ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); ASSERT_EQ(tableMeta->tableInfo.precision, 1); @@ -1084,7 +1105,7 @@ TEST(tableMeta, superTableCase) { while (true) { uint32_t n = ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM); if (0 == n) { - usleep(10000); + usleep(50000); } else { break; } @@ -1111,7 +1132,7 @@ TEST(tableMeta, superTableCase) { while (true) { uint32_t n = ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM); if (2 != n) { - usleep(10000); + usleep(50000); } else { break; } @@ -1199,8 +1220,8 @@ TEST(tableMeta, rmStbMeta) { ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE); ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); - ASSERT_EQ(tableMeta->uid, ctgTestSuid); - ASSERT_EQ(tableMeta->suid, ctgTestSuid); + ASSERT_EQ(tableMeta->uid, ctgTestSuid - 1); + ASSERT_EQ(tableMeta->suid, ctgTestSuid - 1); ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); ASSERT_EQ(tableMeta->tableInfo.precision, 1); @@ -1209,21 +1230,21 @@ TEST(tableMeta, rmStbMeta) { while (true) { uint32_t n = ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM); if (0 == n) { - usleep(10000); + usleep(50000); } else { break; } } - code = catalogRemoveStbMeta(pCtg, "1.db1", ctgTestDbId, ctgTestSTablename, ctgTestSuid); + code = catalogRemoveStbMeta(pCtg, "1.db1", ctgTestDbId, ctgTestSTablename, ctgTestSuid - 1); ASSERT_EQ(code, 0); while (true) { int32_t n = ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM); int32_t m = ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM); if (n || m) { - usleep(10000); + usleep(50000); } else { break; } @@ -1269,8 +1290,8 @@ TEST(tableMeta, updateStbMeta) { ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE); ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); - ASSERT_EQ(tableMeta->uid, ctgTestSuid); - ASSERT_EQ(tableMeta->suid, ctgTestSuid); + ASSERT_EQ(tableMeta->uid, ctgTestSuid - 1); + ASSERT_EQ(tableMeta->suid, ctgTestSuid - 1); ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); ASSERT_EQ(tableMeta->tableInfo.precision, 1); @@ -1279,7 +1300,7 @@ TEST(tableMeta, updateStbMeta) { while (true) { uint32_t n = ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM); if (0 == n) { - usleep(10000); + usleep(50000); } else { break; } @@ -1299,7 +1320,7 @@ TEST(tableMeta, updateStbMeta) { uint64_t n = 0; ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n); if (n != 3) { - usleep(10000); + usleep(50000); } else { break; } @@ -1330,6 +1351,499 @@ TEST(tableMeta, updateStbMeta) { memset(&gCtgMgmt.stat, 0, sizeof(gCtgMgmt.stat)); } +TEST(refreshGetMeta, normal2normal) { + struct SCatalog *pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + SArray *vgList = NULL; + + ctgTestInitLogFile(); + + memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc)); + ctgTestRspIdx = 0; + ctgTestRspFunc[0] = CTGT_RSP_VGINFO; + ctgTestRspFunc[1] = CTGT_RSP_TBMETA; + ctgTestRspFunc[2] = CTGT_RSP_TBMETA; + + ctgTestSetRspByIdx(); + + initQueryModuleMsgHandle(); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + // sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; + strcpy(n.dbname, "db1"); + strcpy(n.tname, ctgTestTablename); + + code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); + ASSERT_EQ(code, 0); + ASSERT_EQ(vgInfo.vgId, 8); + ASSERT_EQ(vgInfo.epset.numOfEps, 3); + + while (true) { + uint64_t n = 0; + ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n); + if (n > 0) { + break; + } + usleep(50000); + } + + STableMeta *tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 8); + ASSERT_EQ(tableMeta->tableType, TSDB_NORMAL_TABLE); + ASSERT_EQ(tableMeta->uid, ctgTestNormalTblUid - 1); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, 0); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + tfree(tableMeta); + + while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) { + usleep(50000); + } + + code = catalogRefreshGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 8); + ASSERT_EQ(tableMeta->tableType, TSDB_NORMAL_TABLE); + ASSERT_EQ(tableMeta->uid, ctgTestNormalTblUid - 1); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, 0); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + tfree(tableMeta); + + catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); +} + +TEST(refreshGetMeta, normal2notexist) { + struct SCatalog *pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + SArray *vgList = NULL; + + ctgTestInitLogFile(); + + memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc)); + ctgTestRspIdx = 0; + ctgTestRspFunc[0] = CTGT_RSP_VGINFO; + ctgTestRspFunc[1] = CTGT_RSP_TBMETA; + ctgTestRspFunc[2] = CTGT_RSP_TBMETA_NOT_EXIST; + + ctgTestSetRspByIdx(); + + initQueryModuleMsgHandle(); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + // sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; + strcpy(n.dbname, "db1"); + strcpy(n.tname, ctgTestTablename); + + code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); + ASSERT_EQ(code, 0); + ASSERT_EQ(vgInfo.vgId, 8); + ASSERT_EQ(vgInfo.epset.numOfEps, 3); + + while (true) { + uint64_t n = 0; + ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n); + if (n > 0) { + break; + } + usleep(50000); + } + + STableMeta *tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 8); + ASSERT_EQ(tableMeta->tableType, TSDB_NORMAL_TABLE); + ASSERT_EQ(tableMeta->uid, ctgTestNormalTblUid - 1); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, 0); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + tfree(tableMeta); + + while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) { + usleep(50000); + } + + code = catalogRefreshGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0); + ASSERT_EQ(code, CTG_ERR_CODE_TABLE_NOT_EXIST); + ASSERT_TRUE(tableMeta == NULL); + + catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); +} + + +TEST(refreshGetMeta, normal2child) { + struct SCatalog *pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + SArray *vgList = NULL; + + ctgTestInitLogFile(); + + memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc)); + ctgTestRspIdx = 0; + ctgTestRspFunc[0] = CTGT_RSP_VGINFO; + ctgTestRspFunc[1] = CTGT_RSP_TBMETA; + ctgTestRspFunc[2] = CTGT_RSP_CTBMETA; + ctgTestRspFunc[3] = CTGT_RSP_STBMETA; + + ctgTestSetRspByIdx(); + + initQueryModuleMsgHandle(); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + // sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; + strcpy(n.dbname, "db1"); + strcpy(n.tname, ctgTestTablename); + ctgTestCurrentCTableName = ctgTestTablename; + ctgTestCurrentSTableName = ctgTestSTablename; + + code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); + ASSERT_EQ(code, 0); + ASSERT_EQ(vgInfo.vgId, 8); + ASSERT_EQ(vgInfo.epset.numOfEps, 3); + + while (true) { + uint64_t n = 0; + ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n); + if (n > 0) { + break; + } + usleep(50000); + } + + STableMeta *tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 8); + ASSERT_EQ(tableMeta->tableType, TSDB_NORMAL_TABLE); + ASSERT_EQ(tableMeta->uid, ctgTestNormalTblUid - 1); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, 0); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + tfree(tableMeta); + + while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) { + usleep(50000); + } + + code = catalogRefreshGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 9); + ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + tfree(tableMeta); + + catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); + ctgTestCurrentCTableName = NULL; + ctgTestCurrentSTableName = NULL; +} + +TEST(refreshGetMeta, stable2child) { + struct SCatalog *pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + SArray *vgList = NULL; + + ctgTestInitLogFile(); + + memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc)); + ctgTestRspIdx = 0; + ctgTestRspFunc[0] = CTGT_RSP_VGINFO; + ctgTestRspFunc[1] = CTGT_RSP_STBMETA; + ctgTestRspFunc[2] = CTGT_RSP_STBMETA; + ctgTestRspFunc[3] = CTGT_RSP_CTBMETA; + ctgTestRspFunc[4] = CTGT_RSP_STBMETA; + + ctgTestSetRspByIdx(); + + initQueryModuleMsgHandle(); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + // sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; + strcpy(n.dbname, "db1"); + strcpy(n.tname, ctgTestTablename); + ctgTestCurrentSTableName = ctgTestTablename; + ctgTestCurrentCTableName = ctgTestTablename; + + code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); + ASSERT_EQ(code, 0); + ASSERT_EQ(vgInfo.vgId, 8); + ASSERT_EQ(vgInfo.epset.numOfEps, 3); + + while (true) { + uint64_t n = 0; + ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n); + if (n > 0) { + break; + } + usleep(50000); + } + + STableMeta *tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 0); + ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->uid, ctgTestSuid - 1); + ASSERT_EQ(tableMeta->suid, ctgTestSuid - 1); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + tfree(tableMeta); + + while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) { + usleep(50000); + } + + ctgTestCurrentSTableName = ctgTestSTablename; + code = catalogRefreshGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 9); + ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + tfree(tableMeta); + + catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); + ctgTestCurrentCTableName = NULL; + ctgTestCurrentSTableName = NULL; +} + +TEST(refreshGetMeta, stable2stable) { + struct SCatalog *pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + SArray *vgList = NULL; + + ctgTestInitLogFile(); + + memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc)); + ctgTestRspIdx = 0; + ctgTestRspFunc[0] = CTGT_RSP_VGINFO; + ctgTestRspFunc[1] = CTGT_RSP_STBMETA; + ctgTestRspFunc[2] = CTGT_RSP_STBMETA; + ctgTestRspFunc[3] = CTGT_RSP_STBMETA; + ctgTestRspFunc[4] = CTGT_RSP_STBMETA; + + ctgTestSetRspByIdx(); + + initQueryModuleMsgHandle(); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + // sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; + strcpy(n.dbname, "db1"); + strcpy(n.tname, ctgTestTablename); + ctgTestCurrentSTableName = ctgTestTablename; + + code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); + ASSERT_EQ(code, 0); + ASSERT_EQ(vgInfo.vgId, 8); + ASSERT_EQ(vgInfo.epset.numOfEps, 3); + + while (true) { + uint64_t n = 0; + ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n); + if (n > 0) { + break; + } + usleep(50000); + } + + STableMeta *tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 0); + ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->uid, ctgTestSuid - 1); + ASSERT_EQ(tableMeta->suid, ctgTestSuid - 1); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + tfree(tableMeta); + + while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) { + usleep(50000); + } + + code = catalogRefreshGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 0); + ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->uid, ctgTestSuid - 1); + ASSERT_EQ(tableMeta->suid, ctgTestSuid - 1); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + tfree(tableMeta); + + catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); + ctgTestCurrentCTableName = NULL; + ctgTestCurrentSTableName = NULL; +} + + +TEST(refreshGetMeta, child2stable) { + struct SCatalog *pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + SArray *vgList = NULL; + + ctgTestInitLogFile(); + + memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc)); + ctgTestRspIdx = 0; + ctgTestRspFunc[0] = CTGT_RSP_VGINFO; + ctgTestRspFunc[1] = CTGT_RSP_CTBMETA; + ctgTestRspFunc[2] = CTGT_RSP_STBMETA; + ctgTestRspFunc[3] = CTGT_RSP_STBMETA; + ctgTestRspFunc[4] = CTGT_RSP_STBMETA; + + ctgTestSetRspByIdx(); + + initQueryModuleMsgHandle(); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + // sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; + strcpy(n.dbname, "db1"); + strcpy(n.tname, ctgTestTablename); + ctgTestCurrentCTableName = ctgTestTablename; + ctgTestCurrentSTableName = ctgTestSTablename; + + code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); + ASSERT_EQ(code, 0); + ASSERT_EQ(vgInfo.vgId, 8); + ASSERT_EQ(vgInfo.epset.numOfEps, 3); + + while (true) { + uint64_t n = 0; + ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n); + if (n > 0) { + break; + } + usleep(50000); + } + + STableMeta *tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 9); + ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + tfree(tableMeta); + + while (2 != ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) { + usleep(50000); + } + + ctgTestCurrentSTableName = ctgTestTablename; + code = catalogRefreshGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 0); + ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->uid, ctgTestSuid - 1); + ASSERT_EQ(tableMeta->suid, ctgTestSuid - 1); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + tfree(tableMeta); + + catalogDestroy(); + memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); + ctgTestCurrentCTableName = NULL; + ctgTestCurrentSTableName = NULL; +} + + TEST(tableDistVgroup, normalTable) { struct SCatalog *pCtg = NULL; void *mockPointer = (void *)0x1; @@ -1499,11 +2013,15 @@ TEST(dbVgroup, getSetDbVgroupCase) { ASSERT_EQ(code, 0); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum); - while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM)) { - usleep(10000); + while (true) { + uint64_t n = 0; + ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n); + if (n > 0) { + break; + } + usleep(50000); } - code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); ASSERT_EQ(code, 0); ASSERT_EQ(vgInfo.vgId, 8); @@ -1525,7 +2043,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { uint64_t n = 0; ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n); if (n != 3) { - usleep(10000); + usleep(50000); } else { break; } @@ -1749,7 +2267,7 @@ TEST(rentTest, allRent) { ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); while (ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM) < i) { - usleep(10000); + usleep(50000); } code = catalogGetExpiredDBs(pCtg, &dbs, &num); -- GitLab