From de439d47875e4e0c873ee2492181a916f32a0db1 Mon Sep 17 00:00:00 2001 From: dapan Date: Sun, 6 Feb 2022 15:37:16 +0800 Subject: [PATCH] feature/qnode --- include/libs/catalog/catalog.h | 6 +- source/client/src/clientHb.c | 127 +++++++++++++++++++++- source/client/src/clientImpl.c | 2 +- source/client/src/clientMsgHandler.c | 2 +- source/dnode/mnode/impl/inc/mndStb.h | 3 + source/dnode/mnode/impl/src/mndDb.c | 6 +- source/dnode/mnode/impl/src/mndProfile.c | 12 +- source/dnode/mnode/impl/src/mndStb.c | 105 +++++++++++++++++- source/libs/catalog/inc/catalogInt.h | 2 +- source/libs/catalog/src/catalog.c | 75 +++++++++++-- source/libs/catalog/test/catalogTests.cpp | 9 +- 11 files changed, 323 insertions(+), 26 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 6f4be556e4..c291ebd8fd 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -53,13 +53,15 @@ typedef struct SCatalogCfg { } SCatalogCfg; typedef struct SSTableMetaVersion { + char dbFName[TSDB_DB_FNAME_LEN]; + char stbName[TSDB_TABLE_NAME_LEN]; uint64_t suid; int16_t sversion; int16_t tversion; } SSTableMetaVersion; typedef struct SDbVgVersion { - char dbName[TSDB_DB_FNAME_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; } SDbVgVersion; @@ -101,6 +103,8 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId); +int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid); + /** * Get a table's meta data. * @param pCatalog (input, got with catalogGetHandle) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index b09c3ee9d8..d265ffaa94 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -85,6 +85,75 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog return TSDB_CODE_SUCCESS; } +static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { + int32_t msgLen = 0; + int32_t code = 0; + int32_t schemaNum = 0; + + while (msgLen < valueLen) { + STableMetaRsp *rsp = (STableMetaRsp *)((char *)value + msgLen); + + rsp->numOfColumns = ntohl(rsp->numOfColumns); + rsp->suid = be64toh(rsp->suid); + + if (rsp->numOfColumns < 0) { + schemaNum = 0; + + tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); + + code = catalogRemoveSTableMeta(pCatalog, rsp->dbFName, rsp->stbName, rsp->suid); + } else { + rsp->numOfTags = ntohl(rsp->numOfTags); + + schemaNum = rsp->numOfColumns + rsp->numOfTags; +/* + rsp->vgNum = ntohl(rsp->vgNum); + rsp->uid = be64toh(rsp->uid); + + SDBVgroupInfo vgInfo = {0}; + vgInfo.dbId = rsp->uid; + vgInfo.vgVersion = rsp->vgVersion; + vgInfo.hashMethod = rsp->hashMethod; + vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (NULL == vgInfo.vgHash) { + tscError("hash init[%d] failed", rsp->vgNum); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < rsp->vgNum; ++i) { + rsp->vgroupInfo[i].vgId = ntohl(rsp->vgroupInfo[i].vgId); + rsp->vgroupInfo[i].hashBegin = ntohl(rsp->vgroupInfo[i].hashBegin); + rsp->vgroupInfo[i].hashEnd = ntohl(rsp->vgroupInfo[i].hashEnd); + + for (int32_t n = 0; n < rsp->vgroupInfo[i].epset.numOfEps; ++n) { + rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port); + } + + if (0 != taosHashPut(vgInfo.vgHash, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) { + tscError("hash push failed, errno:%d", errno); + taosHashCleanup(vgInfo.vgHash); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + + code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo); + if (code) { + taosHashCleanup(vgInfo.vgHash); + } +*/ + } + + if (code) { + return code; + } + + msgLen += sizeof(STableMetaRsp) + schemaNum * sizeof(SSchema); + } + + return TSDB_CODE_SUCCESS; +} + + static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); if (NULL == info) { @@ -117,9 +186,24 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog); break; } - case HEARTBEAT_KEY_STBINFO: + case HEARTBEAT_KEY_STBINFO:{ + if (kv->valueLen <= 0 || NULL == kv->value) { + tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value); + break; + } + int64_t *clusterId = (int64_t *)info->param; + struct SCatalog *pCatalog = NULL; + + int32_t code = catalogGetHandle(*clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); + break; + } + + hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog); break; + } default: tscError("invalid hb key type:%d", kv->key); break; @@ -152,7 +236,7 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code tfree(param); if (rspNum) { - tscDebug("hb got %d rsp, %d empty rsp prior", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); + tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); } else { atomic_add_fetch_32(&emptyRspNum, 1); } @@ -199,6 +283,37 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl return TSDB_CODE_SUCCESS; } +int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { + SSTableMetaVersion *stbs = NULL; + uint32_t stbNum = 0; + int32_t code = 0; + + code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + if (stbNum <= 0) { + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < stbNum; ++i) { + SSTableMetaVersion *stb = &stbs[i]; + stb->suid = htobe64(stb->suid); + stb->sversion = htons(stb->sversion); + stb->tversion = htons(stb->tversion); + } + + SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = sizeof(SSTableMetaVersion) * stbNum, .value = stbs}; + + tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen); + + taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); + + return TSDB_CODE_SUCCESS; +} + + int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) { int64_t *clusterId = (int64_t *)param; struct SCatalog *pCatalog = NULL; @@ -214,6 +329,11 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req return code; } + code = hbGetExpiredStbInfo(connKey, pCatalog, req); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + return TSDB_CODE_SUCCESS; } @@ -384,7 +504,6 @@ static void hbStopThread() { } SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) { - return NULL; hbMgrInit(); SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr)); if (pAppHbMgr == NULL) { @@ -442,7 +561,6 @@ void appHbMgrCleanup(void) { } int hbMgrInit() { - return 0; // init once int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); if (old == 1) return 0; @@ -499,7 +617,6 @@ int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo * } int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { - return 0; SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY}; SHbConnInfo info = {0}; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index c60b89f78e..ff498e68d1 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -109,7 +109,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); p->mgmtEp = epSet; p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); - /*p->pAppHbMgr = appHbMgrInit(p, key);*/ + p->pAppHbMgr = appHbMgrInit(p, key); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); pInst = &p; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 0652618e6c..8ab1880069 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -76,7 +76,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { pTscObj->connType = HEARTBEAT_TYPE_QUERY; - /*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY);*/ + hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY); // pRequest->body.resInfo.pRspMsg = pMsg->pData; tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, diff --git a/source/dnode/mnode/impl/inc/mndStb.h b/source/dnode/mnode/impl/inc/mndStb.h index 48847dc6a3..0855b5bd4d 100644 --- a/source/dnode/mnode/impl/inc/mndStb.h +++ b/source/dnode/mnode/impl/inc/mndStb.h @@ -28,6 +28,9 @@ void mndCleanupStb(SMnode *pMnode); SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName); void mndReleaseStb(SMnode *pMnode, SStbObj *pStb); +int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num, void **rsp, int32_t *rspLen); + + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index f71f3ac478..1bfe8c9d9a 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -907,9 +907,9 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void * len = 0; - SDbObj *pDb = mndAcquireDb(pMnode, db->dbName); + SDbObj *pDb = mndAcquireDb(pMnode, db->dbFName); if (pDb == NULL) { - mInfo("db %s not exist", db->dbName); + mInfo("db %s not exist", db->dbFName); len = sizeof(SUseDbRsp); } else if (pDb->uid != db->dbId || db->vgVersion < pDb->vgVersion) { @@ -929,7 +929,7 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void * } pRsp = (SUseDbRsp *)((char *)buf + bufOffset); - memcpy(pRsp->db, db->dbName, TSDB_DB_FNAME_LEN); + memcpy(pRsp->db, db->dbFName, TSDB_DB_FNAME_LEN); if (pDb) { int32_t vgNum = 0; mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum); diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 3d9e3ff3de..d63ade4320 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -18,6 +18,7 @@ #include "mndProfile.h" //#include "mndConsumer.h" #include "mndDb.h" +#include "mndStb.h" #include "mndMnode.h" #include "mndShow.h" //#include "mndTopic.h" @@ -376,9 +377,16 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { } break; } - case HEARTBEAT_KEY_STBINFO: - + case HEARTBEAT_KEY_STBINFO: { + void *rspMsg = NULL; + int32_t rspLen = 0; + mndValidateStbInfo(pMnode, (SSTableMetaVersion *)kv->value, kv->valueLen/sizeof(SSTableMetaVersion), &rspMsg, &rspLen); + if (rspMsg && rspLen > 0) { + SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg}; + taosArrayPush(hbRsp.info, &kv); + } break; + } default: mError("invalid kv key:%d", kv->key); hbRsp.status = TSDB_CODE_MND_APP_ERROR; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 9944c0c7fc..4ccd4b63c4 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -728,7 +728,7 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) { mDebug("stb:%s, start to retrieve meta", tbFName); - SDbObj *pDb = mndAcquireDbByStb(pMnode, tbFName); + SDbObj *pDb = mndAcquireDb(pMnode, pInfo->dbFName); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; mError("stb:%s, failed to retrieve meta since %s", tbFName, terrstr()); @@ -788,6 +788,109 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) { return 0; } +int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num, void **rsp, int32_t *rspLen) { + SSdb *pSdb = pMnode->pSdb; + int32_t bufSize = num * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema)); + void *buf = malloc(bufSize); + int32_t len = 0; + int32_t contLen = 0; + STableMetaRsp *pRsp = NULL; + + for (int32_t i = 0; i < num; ++i) { + SSTableMetaVersion *stb = &stbs[i]; + stb->suid = be64toh(stb->suid); + stb->sversion = ntohs(stb->sversion); + stb->tversion = ntohs(stb->tversion); + + if ((contLen + sizeof(STableMetaRsp)) > bufSize) { + bufSize = contLen + (num -i) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema)); + buf = realloc(buf, bufSize); + } + + pRsp = (STableMetaRsp *)((char *)buf + contLen); + + strcpy(pRsp->dbFName, stb->dbFName); + strcpy(pRsp->tbName, stb->stbName); + strcpy(pRsp->stbName, stb->stbName); + + mDebug("start to retrieve meta, db:%s, stb:%s", stb->dbFName, stb->stbName); + + SDbObj *pDb = mndAcquireDb(pMnode, stb->dbFName); + if (pDb == NULL) { + pRsp->numOfColumns = -1; + pRsp->suid = htobe64(stb->suid); + contLen += sizeof(STableMetaRsp); + mWarn("db:%s, failed to require db since %s", stb->dbFName, terrstr()); + continue; + } + + char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; + snprintf(tbFName, sizeof(tbFName), "%s.%s", stb->dbFName, stb->stbName); + + SStbObj *pStb = mndAcquireStb(pMnode, tbFName); + if (pStb == NULL) { + mndReleaseDb(pMnode, pDb); + pRsp->numOfColumns = -1; + pRsp->suid = htobe64(stb->suid); + contLen += sizeof(STableMetaRsp); + mWarn("stb:%s, failed to get meta since %s", tbFName, terrstr()); + continue; + } + + taosRLockLatch(&pStb->lock); + + if (stb->suid == pStb->uid && stb->sversion == pStb->version) { + taosRUnLockLatch(&pStb->lock); + mndReleaseDb(pMnode, pDb); + mndReleaseStb(pMnode, pStb); + continue; + } + + int32_t totalCols = pStb->numOfColumns + pStb->numOfTags; + int32_t len = totalCols * sizeof(SSchema); + + contLen += sizeof(STableMetaRsp) + len; + + if (contLen > bufSize) { + bufSize = contLen + (num -i - 1) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema)); + buf = realloc(buf, bufSize); + } + + pRsp->numOfTags = htonl(pStb->numOfTags); + pRsp->numOfColumns = htonl(pStb->numOfColumns); + pRsp->precision = pDb->cfg.precision; + pRsp->tableType = TSDB_SUPER_TABLE; + pRsp->update = pDb->cfg.update; + pRsp->sversion = htonl(pStb->version); + pRsp->suid = htobe64(pStb->uid); + pRsp->tuid = htobe64(pStb->uid); + + for (int32_t i = 0; i < totalCols; ++i) { + SSchema *pSchema = &pRsp->pSchema[i]; + SSchema *pSrcSchema = &pStb->pSchema[i]; + memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); + pSchema->type = pSrcSchema->type; + pSchema->colId = htonl(pSrcSchema->colId); + pSchema->bytes = htonl(pSrcSchema->bytes); + } + taosRUnLockLatch(&pStb->lock); + mndReleaseDb(pMnode, pDb); + mndReleaseStb(pMnode, pStb); + } + + if (contLen > 0) { + *rsp = buf; + *rspLen = contLen; + } else { + *rsp = NULL; + tfree(buf); + *rspLen = 0; + } + + return 0; +} + + static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) { SSdb *pSdb = pMnode->pSdb; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 22997feb3c..9c041d76c7 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -27,7 +27,7 @@ extern "C" { #define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6 #define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100 #define CTG_DEFAULT_CACHE_DB_NUMBER 20 -#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 100000 +#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 10000 #define CTG_DEFAULT_RENT_SECOND 10 #define CTG_DEFAULT_RENT_SLOT_SIZE 10 diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 95e60415c7..02773fe533 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -527,9 +527,9 @@ int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbIn } int32_t ctgSTableVersionCompare(const void* key1, const void* key2) { - if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) { + if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) { return -1; - } else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) { + } else if (*(uint64_t *)key1 > ((SSTableMetaVersion*)key2)->suid) { return 1; } else { return 0; @@ -557,7 +557,7 @@ int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) { mgmt->slots = calloc(1, msgSize); if (NULL == mgmt->slots) { qError("calloc %d failed", (int32_t)msgSize); - return TSDB_CODE_CTG_MEM_ERROR; + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum); @@ -825,6 +825,8 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out if (TSDB_SUPER_TABLE == output->tbMeta->tableType) { bool newAdded = false; SSTableMetaVersion metaRent = {.suid = output->tbMeta->suid, .sversion = output->tbMeta->sversion, .tversion = output->tbMeta->tversion}; + strcpy(metaRent.dbFName, output->dbFName); + strcpy(metaRent.stbName, output->tbName); CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock); if (taosHashPut(dbCache->tbCache.cache, output->tbName, strlen(output->tbName), output->tbMeta, tbSize) != 0) { @@ -951,6 +953,39 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, ui return TSDB_CODE_SUCCESS; } +int32_t ctgValidateAndRemoveStbMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid, bool *removed) { + *removed = false; + + SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName)); + if (NULL == dbCache) { + ctgInfo("db not exist in dbCache, may be removed, db:%s", dbName); + return TSDB_CODE_SUCCESS; + } + + CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock); + if (taosHashRemove(dbCache->tbCache.stbCache, &suid, sizeof(suid))) { + CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); + taosHashRelease(pCatalog->dbCache, dbCache); + ctgInfo("stb not exist in stbCache, may be removed, db:%s, stb:%s, suid:%"PRIx64, dbName, stbName, suid); + return TSDB_CODE_SUCCESS; + } + + if (taosHashRemove(dbCache->tbCache.cache, stbName, strlen(stbName))) { + CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); + taosHashRelease(pCatalog->dbCache, dbCache); + ctgError("stb not exist in cache, db:%s, stb:%s, suid:%"PRIx64, dbName, stbName, suid); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); + + taosHashRelease(pCatalog->dbCache, dbCache); + + *removed = true; + + return TSDB_CODE_SUCCESS; +} + + int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) { if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) { @@ -1065,7 +1100,7 @@ int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMg int32_t catalogInit(SCatalogCfg *cfg) { if (ctgMgmt.pCluster) { - qError("catalog already init"); + qError("catalog already initialized"); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } @@ -1111,7 +1146,7 @@ int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle) { } if (NULL == ctgMgmt.pCluster) { - qError("cluster cache are not ready, clusterId:%"PRIx64, clusterId); + qError("catalog cluster cache are not ready, clusterId:%"PRIx64, clusterId); CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY); } @@ -1312,7 +1347,7 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB ctgMetaRentRemove(&pCatalog->dbRent, dbCache->vgInfo->dbId, ctgDbVgVersionCompare); newAdded = true; } else if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) { - ctgInfo("db vgVersion is not new, db:%s, vgVersion:%d, current:%d", dbName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion); + ctgInfo("db vgVersion is old, db:%s, vgVersion:%d, current:%d", dbName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion); CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); taosHashRelease(pCatalog->dbCache, dbCache); @@ -1345,7 +1380,7 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB dbInfo = NULL; - strncpy(vgVersion.dbName, dbName, sizeof(vgVersion.dbName)); + strncpy(vgVersion.dbFName, dbName, sizeof(vgVersion.dbFName)); if (newAdded) { CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion))); @@ -1394,6 +1429,32 @@ int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t CTG_RET(code); } +int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid) { + int32_t code = 0; + bool removed = false; + + if (NULL == pCatalog || NULL == dbName || NULL == stbName) { + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + } + + if (NULL == pCatalog->dbCache) { + return TSDB_CODE_SUCCESS; + } + + CTG_ERR_RET(ctgValidateAndRemoveStbMeta(pCatalog, dbName, stbName, suid, &removed)); + if (!removed) { + return TSDB_CODE_SUCCESS; + } + + ctgInfo("stb removed from cache, db:%s, stbName:%s, suid:%"PRIx64, dbName, stbName, suid); + + CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->stbRent, suid, ctgSTableVersionCompare)); + + ctgDebug("stb removed from rent, db:%s, stbName:%s, suid:%"PRIx64, dbName, stbName, suid); + + CTG_RET(code); +} + int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1); diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index bfb950e5db..751fa72347 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -748,7 +748,7 @@ TEST(tableMeta, normalTable) { } if (stbNum) { - printf("got expired stb,suid:%" PRId64 "\n", stb->suid); + printf("got expired stb,suid:%" PRId64 ",dbFName:%s, stbName:%s\n", stb->suid, stb->dbFName, stb->stbName); free(stb); stb = NULL; } else { @@ -844,7 +844,7 @@ TEST(tableMeta, childTableCase) { } if (stbNum) { - printf("got expired stb,suid:%" PRId64 "\n", stb->suid); + printf("got expired stb,suid:%" PRId64 ",dbFName:%s, stbName:%s\n", stb->suid, stb->dbFName, stb->stbName); free(stb); stb = NULL; } else { @@ -945,7 +945,8 @@ TEST(tableMeta, superTableCase) { } if (stbNum) { - printf("got expired stb,suid:%" PRId64 "\n", stb->suid); + printf("got expired stb,suid:%" PRId64 ",dbFName:%s, stbName:%s\n", stb->suid, stb->dbFName, stb->stbName); + free(stb); stb = NULL; } else { @@ -1289,7 +1290,7 @@ TEST(rentTest, allRent) { printf("%d - expired stableNum:%d\n", i, num); if (stable) { for (int32_t n = 0; n < num; ++n) { - printf("suid:%" PRId64 ", sversion:%d, tversion:%d\n", stable[n].suid, stable[n].sversion, stable[n].tversion); + printf("suid:%" PRId64 ", dbFName:%s, stbName:%s, sversion:%d, tversion:%d\n", stable[n].suid, stable[n].dbFName, stable[n].stbName, stable[n].sversion, stable[n].tversion); } free(stable); stable = NULL; -- GitLab