From 639b5102ce1d2230e43806a2652f488c0a977bcf Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 1 Mar 2022 19:48:21 +0800 Subject: [PATCH] feature/qnode --- include/common/tmsg.h | 2 + include/libs/catalog/catalog.h | 2 +- include/libs/qcom/query.h | 2 + source/client/src/clientMsgHandler.c | 38 ++++++++ source/common/src/tmsg.c | 2 + source/dnode/mnode/impl/src/mndDb.c | 53 ++++++----- source/dnode/mnode/impl/src/mnode.c | 2 +- source/libs/catalog/src/catalog.c | 109 +++++++++++++++------- source/libs/catalog/test/catalogTests.cpp | 5 +- source/libs/parser/src/dCDAstProcess.c | 1 + source/libs/qcom/src/querymsg.c | 56 +++++------ 11 files changed, 184 insertions(+), 88 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a739e51d6b..e7b2dfed89 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -167,6 +167,7 @@ typedef struct { typedef struct { char db[TSDB_DB_FNAME_LEN]; + int64_t dbId; int32_t vgVersion; } SBuildUseDBInput; @@ -563,6 +564,7 @@ int32_t tDeserializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp); typedef struct { char db[TSDB_DB_FNAME_LEN]; + int64_t dbId; int32_t vgVersion; } SUseDbReq; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index ebe7c92d31..c6183780f9 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -95,7 +95,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle); */ void catalogFreeHandle(SCatalog* pCatalog); -int32_t catalogGetDBVgVersion(SCatalog* pCatalog, const char* dbName, int32_t* version); +int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId); /** * Get a DB's all vgroup info. diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 76f23da1e3..6d3f97fc4e 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -156,6 +156,8 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); */ int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); +int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp); + void initQueryModuleMsgHandle(); const SSchema* tGetTbnameColumnSchema(); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 454fb7f1fe..9534c11646 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -19,6 +19,7 @@ #include "clientInt.h" #include "clientLog.h" #include "catalog.h" +#include "query.h" int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); @@ -243,6 +244,23 @@ int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; + if (TSDB_CODE_MND_DB_NOT_EXIST == code) { + SUseDbRsp usedbRsp = {0}; + tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp); + struct SCatalog *pCatalog = NULL; + + if (usedbRsp.vgVersion >= 0) { + int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tstrerror(code)); + } else { + catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid); + } + } + + tFreeSUsedbRsp(&usedbRsp); + } + if (code != TSDB_CODE_SUCCESS) { free(pMsg->pData); setErrno(pRequest, code); @@ -256,6 +274,26 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { SName name = {0}; tNameFromString(&name, usedbRsp.db, T_NAME_ACCT|T_NAME_DB); + SUseDbOutput output = {0}; + code = queryBuildUseDbOutput(&output, &usedbRsp); + + if (code != 0) { + terrno = code; + if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash); + tfree(output.dbVgroup); + + tscError("failed to build use db output since %s", terrstr()); + } else { + struct SCatalog *pCatalog = NULL; + + int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tstrerror(code)); + } else { + catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup); + } + } + tFreeSUsedbRsp(&usedbRsp); char db[TSDB_DB_NAME_LEN] = {0}; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 8a3cd0a718..8de3906388 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1421,6 +1421,7 @@ int32_t tSerializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) { if (tStartEncode(&encoder) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; + if (tEncodeI64(&encoder, pReq->dbId) < 0) return -1; if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1; tEndEncode(&encoder); @@ -1435,6 +1436,7 @@ int32_t tDeserializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) { if (tStartDecode(&decoder) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->dbId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1; tEndDecode(&decoder); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 9165fa2264..974f5fc982 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -940,43 +940,51 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { char *p = strchr(usedbReq.db, '.'); if (p && 0 == strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB)) { memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN); + code = 0; } else { pDb = mndAcquireDb(pMnode, usedbReq.db); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto USE_DB_OVER; - } - pUser = mndAcquireUser(pMnode, pReq->user); - if (pUser == NULL) { - goto USE_DB_OVER; - } + memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN); + usedbRsp.uid = usedbReq.dbId; + usedbRsp.vgVersion = usedbReq.vgVersion; - if (mndCheckUseDbAuth(pUser, pDb) != 0) { - goto USE_DB_OVER; - } + mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr()); + } else { + pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + goto USE_DB_OVER; + } - usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo)); - if (usedbRsp.pVgroupInfos == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto USE_DB_OVER; - } + if (mndCheckUseDbAuth(pUser, pDb) != 0) { + goto USE_DB_OVER; + } - if (usedbReq.vgVersion < pDb->vgVersion) { - mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos); - } + usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo)); + if (usedbRsp.pVgroupInfos == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto USE_DB_OVER; + } + + if (usedbReq.vgVersion < pDb->vgVersion || usedbReq.dbId != pDb->uid) { + mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos); + } - memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN); - usedbRsp.uid = pDb->uid; - usedbRsp.vgVersion = pDb->vgVersion; - usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos); - usedbRsp.hashMethod = pDb->hashMethod; + memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN); + usedbRsp.uid = pDb->uid; + usedbRsp.vgVersion = pDb->vgVersion; + usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos); + usedbRsp.hashMethod = pDb->hashMethod; + code = 0; + } } int32_t contLen = tSerializeSUseDbRsp(NULL, 0, &usedbRsp); void *pRsp = rpcMallocCont(contLen); if (pRsp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + code = -1; goto USE_DB_OVER; } @@ -984,7 +992,6 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { pReq->pCont = pRsp; pReq->contLen = contLen; - code = 0; USE_DB_OVER: if (code != 0) { diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index ab5d0d794b..90d66a80fd 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -489,7 +489,7 @@ PROCESS_RPC_END: if (code == TSDB_CODE_APP_NOT_READY) { mndSendRedirectRsp(pMnode, &pMsg->rpcMsg); } else if (code != 0) { - SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code}; + SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont, .code = code}; rpcSendResponse(&rpcRsp); } else { SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont}; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 2b6caa2f8f..f0ea51c2f9 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -166,6 +166,28 @@ void ctgDbgShowDBCache(SHashObj *dbHash) { taosHashGetKey(dbCache, (void **)&dbFName, &len); CTG_CACHE_DEBUG("** %dth db [%.*s][%"PRIx64"] **", i, (int32_t)len, dbFName, dbCache->dbId); + + CTG_CACHE_DEBUG("deleted: %d", dbCache->deleted); + if (dbCache->vgInfo) { + CTG_CACHE_DEBUG("vgVersion: %d", dbCache->vgInfo->vgVersion); + CTG_CACHE_DEBUG("hashMethod: %d", dbCache->vgInfo->hashMethod); + if (dbCache->vgInfo->vgHash) { + CTG_CACHE_DEBUG("vgNum: %d", taosHashGetSize(dbCache->vgInfo->vgHash)); + //TODO + } else { + CTG_CACHE_DEBUG("vgHash: %p", dbCache->vgInfo->vgHash); + } + } else { + CTG_CACHE_DEBUG("vgInfo: %p", dbCache->vgInfo); + } + + if (dbCache->tbCache.metaCache) { + CTG_CACHE_DEBUG("metaNum: %d", taosHashGetSize(dbCache->tbCache.metaCache)); + } + + if (dbCache->tbCache.stbCache) { + CTG_CACHE_DEBUG("stbNum: %d", taosHashGetSize(dbCache->tbCache.stbCache)); + } pIter = taosHashIterate(dbHash, pIter); } @@ -242,6 +264,34 @@ void ctgFreeMetaRent(SCtgRentMgmt *mgmt) { } +int32_t ctgPushRmDBMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) { + int32_t code = 0; + SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB}; + SCtgRemoveDBMsg *msg = malloc(sizeof(SCtgRemoveDBMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + msg->pCtg = pCtg; + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + msg->dbId = dbId; + + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(&action)); + + ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); + + return TSDB_CODE_SUCCESS; + +_return: + + tfree(action.data); + CTG_RET(code); +} + + void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) { CTG_LOCK(CTG_WRITE, &cache->stbLock); if (cache->stbCache) { @@ -452,12 +502,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtE rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); if (TSDB_CODE_SUCCESS != rpcRsp.code) { - if (CTG_DB_NOT_EXIST(rpcRsp.code)) { - ctgDebug("db not exist in mnode, dbFName:%s", input->db); - return rpcRsp.code; - } - - ctgError("error rsp for use db, code:%s, db:%s", tstrerror(rpcRsp.code), input->db); + ctgError("error rsp for use db, error:%s, db:%s", tstrerror(rpcRsp.code), input->db); CTG_ERR_RET(rpcRsp.code); } @@ -1365,20 +1410,33 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) { int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache, SDBVgInfo **pInfo) { bool inCache = false; int32_t code = 0; - if (!forceUpdate) { - CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache)); - if (inCache) { - return TSDB_CODE_SUCCESS; - } + + CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache)); + + if (inCache && !forceUpdate) { + return TSDB_CODE_SUCCESS; } SUseDbOutput DbOut = {0}; SBuildUseDBInput input = {0}; tstrncpy(input.db, dbFName, tListLen(input.db)); - input.vgVersion = CTG_DEFAULT_INVALID_VERSION; + if (inCache) { + input.dbId = (*dbCache)->dbId; + input.vgVersion = (*dbCache)->vgInfo->vgVersion; + } else { + input.vgVersion = CTG_DEFAULT_INVALID_VERSION; + } - CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut)); + code = ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut); + if (code) { + if (CTG_DB_NOT_EXIST(code) && input.vgVersion > CTG_DEFAULT_INVALID_VERSION) { + ctgDebug("db no longer exist, dbFName:%s, dbId:%" PRIx64, input.db, input.dbId); + ctgPushRmDBMsgInQueue(pCtg, input.db, input.dbId); + } + + CTG_ERR_RET(code); + } CTG_ERR_JRET(ctgCloneVgInfo(DbOut.dbVgroup, pInfo)); @@ -1772,7 +1830,6 @@ int32_t ctgActRemoveTbl(SCtgMetaAction *action) { } - void* ctgUpdateThreadFunc(void* param) { setThreadName("catalog"); @@ -1964,10 +2021,10 @@ void catalogFreeHandle(SCatalog* pCtg) { ctgInfo("handle freed, culsterId:%"PRIx64, clusterId); } -int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version) { +int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId) { CTG_API_ENTER(); - if (NULL == pCtg || NULL == dbFName || NULL == version) { + if (NULL == pCtg || NULL == dbFName || NULL == version || NULL == dbId) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } @@ -1994,6 +2051,7 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers } *version = dbCache->vgInfo->vgVersion; + *dbId = dbCache->dbId; ctgReleaseVgInfo(dbCache); ctgReleaseDBCache(pCtg, dbCache); @@ -2099,29 +2157,12 @@ int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) { CTG_API_LEAVE(TSDB_CODE_SUCCESS); } - SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB}; - SCtgRemoveDBMsg *msg = malloc(sizeof(SCtgRemoveDBMsg)); - if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg)); - CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR); - } - - msg->pCtg = pCtg; - strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); - msg->dbId = dbId; - - action.data = msg; - - CTG_ERR_JRET(ctgPushAction(&action)); - - ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); + CTG_ERR_JRET(ctgPushRmDBMsgInQueue(pCtg, dbFName, dbId)); CTG_API_LEAVE(TSDB_CODE_SUCCESS); _return: - tfree(action.data); - CTG_API_LEAVE(code); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index f3aecadd3f..177592ff38 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -128,6 +128,7 @@ void ctgTestInitLogFile() { tsAsyncLog = 0; qDebugFlag = 159; + strcpy(tsLogDir, "/var/log/taos"); ctgDbgEnableDebug("api"); @@ -827,7 +828,7 @@ void *ctgTestSetCtableMetaThread(void *param) { return NULL; } -#if 0 +#if 1 TEST(tableMeta, normalTable) { @@ -1498,7 +1499,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { ASSERT_EQ(code, 0); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum); - while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM)) { + while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM)) { usleep(10000); } diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 6131d50c12..713847807d 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -762,6 +762,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch SUseDbReq usedbReq = {0}; tNameExtractFullName(&n, usedbReq.db); + catalogGetDBVgVersion(pCtx->pCatalog, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId); int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq); void* pBuf = malloc(bufLen); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 8bc29656d4..2a52e01dc1 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -24,6 +24,32 @@ int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen) = {0}; int32_t (*queryProcessMsgRsp[TDMT_MAX])(void *output, char *msg, int32_t msgSize) = {0}; +int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { + memcpy(pOut->db, usedbRsp->db, TSDB_DB_FNAME_LEN); + pOut->dbId = usedbRsp->uid; + pOut->dbVgroup = calloc(1, sizeof(SDBVgInfo)); + if (NULL == pOut->dbVgroup) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + pOut->dbVgroup->vgVersion = usedbRsp->vgVersion; + pOut->dbVgroup->hashMethod = usedbRsp->hashMethod; + pOut->dbVgroup->vgHash = + taosHashInit(usedbRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (NULL == pOut->dbVgroup->vgHash) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < usedbRsp->vgNum; ++i) { + SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp->pVgroupInfos, i); + if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + + return TSDB_CODE_SUCCESS; +} + int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) { SBuildTableMetaInput *pInput = input; if (NULL == input || NULL == msg || NULL == msgLen) { @@ -57,6 +83,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms strncpy(usedbReq.db, pInput->db, sizeof(usedbReq.db)); usedbReq.db[sizeof(usedbReq.db) - 1] = 0; usedbReq.vgVersion = pInput->vgVersion; + usedbReq.dbId = pInput->dbId; int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq); void *pBuf = rpcMallocCont(bufLen); @@ -90,35 +117,10 @@ int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { goto PROCESS_USEDB_OVER; } - memcpy(pOut->db, usedbRsp.db, TSDB_DB_FNAME_LEN); - pOut->dbId = usedbRsp.uid; - pOut->dbVgroup = calloc(1, sizeof(SDBVgInfo)); - if (NULL == pOut->dbVgroup) { - code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto PROCESS_USEDB_OVER; - } - - pOut->dbVgroup->vgVersion = usedbRsp.vgVersion; - pOut->dbVgroup->hashMethod = usedbRsp.hashMethod; - pOut->dbVgroup->vgHash = - taosHashInit(usedbRsp.vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - if (NULL == pOut->dbVgroup->vgHash) { - tfree(pOut->dbVgroup); - code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto PROCESS_USEDB_OVER; - } - - for (int32_t i = 0; i < usedbRsp.vgNum; ++i) { - SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp.pVgroupInfos, i); - if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) { - code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto PROCESS_USEDB_OVER; - } - } - - code = 0; + code = queryBuildUseDbOutput(pOut, &usedbRsp); PROCESS_USEDB_OVER: + if (code != 0) { if (pOut) { if (pOut->dbVgroup) taosHashCleanup(pOut->dbVgroup->vgHash); -- GitLab