From 14426025a5a068b069b95448d89f61569dc4e8f7 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 27 May 2022 22:38:02 +0800 Subject: [PATCH] add async get db info api --- include/libs/catalog/catalog.h | 18 +- source/libs/catalog/inc/catalogInt.h | 71 +++++--- source/libs/catalog/src/catalog.c | 42 +++-- source/libs/catalog/src/ctgAsync.c | 94 +++++++++- source/libs/catalog/src/ctgCache.c | 213 +++++++++++++++------- source/libs/catalog/src/ctgUtil.c | 12 +- source/libs/catalog/test/catalogTests.cpp | 10 +- 7 files changed, 335 insertions(+), 125 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 5b746015e3..8027b9394e 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -52,23 +52,31 @@ typedef struct SUserAuthInfo { AUTH_TYPE type; } SUserAuthInfo; +typedef struct SDbInfo { + int32_t vgVer; + int32_t tbNum; + int64_t dbId; +} SDbInfo; + typedef struct SCatalogReq { - SArray *pTableMeta; // element is SNAME SArray *pDbVgroup; // element is db full name + SArray *pDbCfg; // element is db full name + SArray *pDbInfo; // element is db full name + SArray *pTableMeta; // element is SNAME SArray *pTableHash; // element is SNAME SArray *pUdf; // element is udf name - SArray *pDbCfg; // element is db full name SArray *pIndex; // element is index name SArray *pUser; // element is SUserAuthInfo bool qNodeRequired; // valid qnode } SCatalogReq; typedef struct SMetaData { - SArray *pTableMeta; // SArray SArray *pDbVgroup; // SArray*> + SArray *pDbCfg; // SArray + SArray *pDbInfo; // SArray + SArray *pTableMeta; // SArray SArray *pTableHash; // SArray SArray *pUdfList; // SArray - SArray *pDbCfg; // SArray SArray *pIndex; // SArray SArray *pUser; // SArray SArray *pQnodeList; // SArray @@ -269,6 +277,8 @@ int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth); +int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet); + int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId); diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 40bd3659a3..230949ab7f 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -49,19 +49,21 @@ enum { }; enum { - CTG_ACT_UPDATE_VG = 0, - CTG_ACT_UPDATE_TBL, - CTG_ACT_REMOVE_DB, - CTG_ACT_REMOVE_STB, - CTG_ACT_REMOVE_TBL, - CTG_ACT_UPDATE_USER, - CTG_ACT_MAX + CTG_OP_UPDATE_VGROUP = 0, + CTG_OP_UPDATE_TB_META, + CTG_OP_DROP_DB_CACHE, + CTG_OP_DROP_STB_META, + CTG_OP_DROP_TB_META, + CTG_OP_UPDATE_USER, + CTG_OP_UPDATE_VG_EPSET, + CTG_OP_MAX }; typedef enum { CTG_TASK_GET_QNODE = 0, CTG_TASK_GET_DB_VGROUP, CTG_TASK_GET_DB_CFG, + CTG_TASK_GET_DB_INFO, CTG_TASK_GET_TB_META, CTG_TASK_GET_TB_HASH, CTG_TASK_GET_INDEX, @@ -98,6 +100,10 @@ typedef struct SCtgDbCfgCtx { char dbFName[TSDB_DB_FNAME_LEN]; } SCtgDbCfgCtx; +typedef struct SCtgDbInfoCtx { + char dbFName[TSDB_DB_FNAME_LEN]; +} SCtgDbInfoCtx; + typedef struct SCtgTbHashCtx { char dbFName[TSDB_DB_FNAME_LEN]; SName* pName; @@ -182,6 +188,7 @@ typedef struct SCtgJob { int32_t dbCfgNum; int32_t indexNum; int32_t userNum; + int32_t dbInfoNum; } SCtgJob; typedef struct SCtgMsgCtx { @@ -285,16 +292,22 @@ typedef struct SCtgUpdateUserMsg { SGetUserAuthRsp userAuth; } SCtgUpdateUserMsg; +typedef struct SCtgUpdateEpsetMsg { + SCatalog* pCtg; + char dbFName[TSDB_DB_FNAME_LEN]; + int32_t vgId; + SEpSet epSet; +} SCtgUpdateEpsetMsg; -typedef struct SCtgMetaAction { - int32_t act; +typedef struct SCtgCacheOperation { + int32_t opId; void *data; bool syncReq; uint64_t seqId; -} SCtgMetaAction; +} SCtgCacheOperation; typedef struct SCtgQNode { - SCtgMetaAction action; + SCtgCacheOperation op; struct SCtgQNode *next; } SCtgQNode; @@ -321,13 +334,13 @@ typedef struct SCatalogMgmt { } SCatalogMgmt; typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); -typedef int32_t (*ctgActFunc)(SCtgMetaAction *); +typedef int32_t (*ctgOpFunc)(SCtgCacheOperation *); -typedef struct SCtgAction { - int32_t actId; +typedef struct SCtgOperation { + int32_t opId; char name[32]; - ctgActFunc func; -} SCtgAction; + ctgOpFunc func; +} SCtgOperation; #define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.queue.qRemainNum, 1) #define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.queue.qRemainNum, 1) @@ -435,12 +448,13 @@ int32_t ctgdShowCacheInfo(void); int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq); int32_t ctgGetTbMetaFromCache(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta); -int32_t ctgActUpdateVg(SCtgMetaAction *action); -int32_t ctgActUpdateTb(SCtgMetaAction *action); -int32_t ctgActRemoveDB(SCtgMetaAction *action); -int32_t ctgActRemoveStb(SCtgMetaAction *action); -int32_t ctgActRemoveTb(SCtgMetaAction *action); -int32_t ctgActUpdateUser(SCtgMetaAction *action); +int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action); +int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action); +int32_t ctgOpDropDbCache(SCtgCacheOperation *action); +int32_t ctgOpDropStbMeta(SCtgCacheOperation *action); +int32_t ctgOpDropTbMeta(SCtgCacheOperation *action); +int32_t ctgOpUpdateUser(SCtgCacheOperation *action); +int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation); int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache); void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache); void ctgReleaseVgInfo(SCtgDBCache *dbCache); @@ -449,12 +463,13 @@ int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32 int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta); int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName); int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass); -int32_t ctgPutRmDBToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId); -int32_t ctgPutRmStbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq); -int32_t ctgPutRmTbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq); -int32_t ctgPutUpdateVgToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq); -int32_t ctgPutUpdateTbToQueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq); -int32_t ctgPutUpdateUserToQueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq); +int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId); +int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq); +int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq); +int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq); +int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq); +int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq); +int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet); int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type); int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size); int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 861de1ab60..3b04584003 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -41,9 +41,9 @@ int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq tNameGetFullDbName(pTableName, dbFName); if (TSDB_SUPER_TABLE == tblMeta->tableType) { - CTG_ERR_JRET(ctgPutRmStbToQueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq)); + CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq)); } else { - CTG_ERR_JRET(ctgPutRmTbToQueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq)); + CTG_ERR_JRET(ctgDropTbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq)); } _return: @@ -72,7 +72,7 @@ int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, con CTG_ERR_JRET(ctgCloneVgInfo(DbOut.dbVgroup, pInfo)); - CTG_ERR_RET(ctgPutUpdateVgToQueue(pCtg, dbFName, DbOut.dbId, DbOut.dbVgroup, false)); + CTG_ERR_RET(ctgUpdateVgroupEnqueue(pCtg, dbFName, DbOut.dbId, DbOut.dbVgroup, false)); return TSDB_CODE_SUCCESS; @@ -108,13 +108,13 @@ int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, if (code) { if (CTG_DB_NOT_EXIST(code) && (NULL != dbCache)) { ctgDebug("db no longer exist, dbFName:%s, dbId:%" PRIx64, input.db, input.dbId); - ctgPutRmDBToQueue(pCtg, input.db, input.dbId); + ctgDropDbCacheEnqueue(pCtg, input.db, input.dbId); } CTG_ERR_RET(code); } - CTG_ERR_RET(ctgPutUpdateVgToQueue(pCtg, dbFName, DbOut.dbId, DbOut.dbVgroup, true)); + CTG_ERR_RET(ctgUpdateVgroupEnqueue(pCtg, dbFName, DbOut.dbId, DbOut.dbVgroup, true)); return TSDB_CODE_SUCCESS; } @@ -201,7 +201,7 @@ int32_t ctgRefreshTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMetaOutput **pOut CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput)); } - CTG_ERR_JRET(ctgPutUpdateTbToQueue(pCtg, output, syncReq)); + CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, output, syncReq)); return TSDB_CODE_SUCCESS; @@ -298,9 +298,9 @@ _return: } if (TSDB_SUPER_TABLE == ctx->tbInfo.tbType) { - ctgPutRmStbToQueue(pCtg, dbFName, ctx->tbInfo.dbId, ctx->pName->tname, ctx->tbInfo.suid, false); + ctgDropStbMetaEnqueue(pCtg, dbFName, ctx->tbInfo.dbId, ctx->pName->tname, ctx->tbInfo.suid, false); } else { - ctgPutRmTbToQueue(pCtg, dbFName, ctx->tbInfo.dbId, ctx->pName->tname, false); + ctgDropTbMetaEnqueue(pCtg, dbFName, ctx->tbInfo.dbId, ctx->pName->tname, false); } } @@ -348,7 +348,7 @@ int32_t ctgChkAuth(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const c _return: - ctgPutUpdateUserToQueue(pCtg, &authRsp, false); + ctgUpdateUserEnqueue(pCtg, &authRsp, false); return TSDB_CODE_SUCCESS; } @@ -670,7 +670,7 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); } - code = ctgPutUpdateVgToQueue(pCtg, dbFName, dbId, dbInfo, false); + code = ctgUpdateVgroupEnqueue(pCtg, dbFName, dbId, dbInfo, false); _return: @@ -691,7 +691,7 @@ int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) { CTG_API_LEAVE(TSDB_CODE_SUCCESS); } - CTG_ERR_JRET(ctgPutRmDBToQueue(pCtg, dbFName, dbId)); + CTG_ERR_JRET(ctgDropDbCacheEnqueue(pCtg, dbFName, dbId)); CTG_API_LEAVE(TSDB_CODE_SUCCESS); @@ -701,7 +701,19 @@ _return: } int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet) { - return 0; + CTG_API_ENTER(); + + int32_t code = 0; + + if (NULL == pCtg || NULL == dbFName || NULL == epSet) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + CTG_ERR_JRET(ctgUpdateVgEpsetEnqueue(pCtg, (char*)dbFName, vgId, epSet)); + +_return: + + CTG_API_LEAVE(code); } int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) { @@ -738,7 +750,7 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, CTG_API_LEAVE(TSDB_CODE_SUCCESS); } - CTG_ERR_JRET(ctgPutRmStbToQueue(pCtg, dbFName, dbId, stbName, suid, true)); + CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, dbId, stbName, suid, true)); CTG_API_LEAVE(TSDB_CODE_SUCCESS); @@ -791,7 +803,7 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) { CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output->tbMeta)); - CTG_ERR_JRET(ctgPutUpdateTbToQueue(pCtg, output, false)); + CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, output, false)); CTG_API_LEAVE(code); @@ -1152,7 +1164,7 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - CTG_API_LEAVE(ctgPutUpdateUserToQueue(pCtg, pAuth, false)); + CTG_API_LEAVE(ctgUpdateUserEnqueue(pCtg, pAuth, false)); } diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 0341c3638b..eb84bf00a4 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -95,6 +95,30 @@ int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) { return TSDB_CODE_SUCCESS; } +int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) { + SCtgTask task = {0}; + + task.type = CTG_TASK_GET_DB_INFO; + task.taskId = taskIdx; + task.pJob = pJob; + + task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgDbInfoCtx)); + if (NULL == task.taskCtx) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + SCtgDbInfoCtx* ctx = task.taskCtx; + + memcpy(ctx->dbFName, dbFName, sizeof(ctx->dbFName)); + + taosArrayPush(pJob->pTasks, &task); + + qDebug("QID:%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName); + + return TSDB_CODE_SUCCESS; +} + + int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, SName *name) { SCtgTask task = {0}; @@ -219,8 +243,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* int32_t dbCfgNum = (int32_t)taosArrayGetSize(pReq->pDbCfg); int32_t indexNum = (int32_t)taosArrayGetSize(pReq->pIndex); int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser); + int32_t dbInfoNum = (int32_t)taosArrayGetSize(pReq->pDbInfo); - int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum; + int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum; if (taskNum <= 0) { ctgError("empty input for job, taskNum:%d", taskNum); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); @@ -249,6 +274,7 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* pJob->dbCfgNum = dbCfgNum; pJob->indexNum = indexNum; pJob->userNum = userNum; + pJob->dbInfoNum = dbInfoNum; pJob->pTasks = taosArrayInit(taskNum, sizeof(SCtgTask)); @@ -268,6 +294,11 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* CTG_ERR_JRET(ctgInitGetDbCfgTask(pJob, taskIdx++, dbFName)); } + for (int32_t i = 0; i < dbInfoNum; ++i) { + char *dbFName = taosArrayGet(pReq->pDbInfo, i); + CTG_ERR_JRET(ctgInitGetDbInfoTask(pJob, taskIdx++, dbFName)); + } + for (int32_t i = 0; i < tbMetaNum; ++i) { SName *name = taosArrayGet(pReq->pTableMeta, i); CTG_ERR_JRET(ctgInitGetTbMetaTask(pJob, taskIdx++, name)); @@ -395,6 +426,20 @@ int32_t ctgDumpDbCfgRes(SCtgTask* pTask) { return TSDB_CODE_SUCCESS; } +int32_t ctgDumpDbInfoRes(SCtgTask* pTask) { + SCtgJob* pJob = pTask->pJob; + if (NULL == pJob->jobRes.pDbInfo) { + pJob->jobRes.pDbInfo = taosArrayInit(pJob->dbInfoNum, sizeof(SDbInfo)); + if (NULL == pJob->jobRes.pDbInfo) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + } + + taosArrayPush(pJob->jobRes.pDbInfo, pTask->res); + + return TSDB_CODE_SUCCESS; +} + int32_t ctgDumpUdfRes(SCtgTask* pTask) { SCtgJob* pJob = pTask->pJob; if (NULL == pJob->jobRes.pUdfList) { @@ -620,7 +665,7 @@ int32_t ctgHandleGetDbVgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pM CTG_ERR_JRET(ctgGenerateVgList(pCtg, pOut->dbVgroup->vgHash, (SArray**)&pTask->res)); - CTG_ERR_JRET(ctgPutUpdateVgToQueue(pCtg, ctx->dbFName, pOut->dbId, pOut->dbVgroup, false)); + CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, ctx->dbFName, pOut->dbId, pOut->dbVgroup, false)); pOut->dbVgroup = NULL; break; @@ -659,7 +704,7 @@ int32_t ctgHandleGetTbHashRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf * CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, ctx->pName, (SVgroupInfo*)pTask->res)); - CTG_ERR_JRET(ctgPutUpdateVgToQueue(pCtg, ctx->dbFName, pOut->dbId, pOut->dbVgroup, false)); + CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, ctx->dbFName, pOut->dbId, pOut->dbVgroup, false)); pOut->dbVgroup = NULL; break; @@ -691,6 +736,11 @@ _return: CTG_RET(code); } +int32_t ctgHandleGetDbInfoRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { + CTG_RET(TSDB_CODE_APP_ERROR); +} + + int32_t ctgHandleGetQnodeRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); @@ -769,7 +819,7 @@ _return: } } - ctgPutUpdateUserToQueue(pCtg, pOut, false); + ctgUpdateUserEnqueue(pCtg, pOut, false); taosMemoryFreeClear(pTask->msgCtx.out); ctgHandleTaskEnd(pTask, code); @@ -933,6 +983,41 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) { return TSDB_CODE_SUCCESS; } +int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) { + int32_t code = 0; + SCatalog* pCtg = pTask->pJob->pCtg; + void *pTrans = pTask->pJob->pTrans; + const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps; + SCtgDBCache *dbCache = NULL; + SCtgDbInfoCtx* pCtx = (SCtgDbInfoCtx*)pTask->taskCtx; + + pTask->res = taosMemoryCalloc(1, sizeof(SDbInfo)); + if (NULL == pTask->res) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + SDbInfo* pInfo = (SDbInfo*)pTask->res; + CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache)); + if (NULL != dbCache) { + pInfo->vgVer = dbCache->vgInfo->vgVersion; + pInfo->dbId = dbCache->dbId; + pInfo->tbNum = dbCache->vgInfo->numOfTable; + } else { + pInfo->vgVer = CTG_DEFAULT_INVALID_VERSION; + } + + CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0)); + +_return: + + if (dbCache) { + ctgReleaseVgInfo(dbCache); + ctgReleaseDBCache(pCtg, dbCache); + } + + CTG_RET(code); +} + int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; void *pTrans = pTask->pJob->pTrans; @@ -992,6 +1077,7 @@ SCtgAsyncFps gCtgAsyncFps[] = { {ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes}, {ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes}, {ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes}, + {ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes}, {ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes}, {ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes}, {ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes}, diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 9161c7cb32..d1e2056bec 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -19,37 +19,43 @@ #include "catalogInt.h" #include "systable.h" -SCtgAction gCtgAction[CTG_ACT_MAX] = { +SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = { { - CTG_ACT_UPDATE_VG, + CTG_OP_UPDATE_VGROUP, "update vgInfo", - ctgActUpdateVg + ctgOpUpdateVgroup }, { - CTG_ACT_UPDATE_TBL, + CTG_OP_UPDATE_TB_META, "update tbMeta", - ctgActUpdateTb + ctgOpUpdateTbMeta }, { - CTG_ACT_REMOVE_DB, - "remove DB", - ctgActRemoveDB + CTG_OP_DROP_DB_CACHE, + "drop DB", + ctgOpDropDbCache }, { - CTG_ACT_REMOVE_STB, - "remove stbMeta", - ctgActRemoveStb + CTG_OP_DROP_STB_META, + "drop stbMeta", + ctgOpDropStbMeta }, { - CTG_ACT_REMOVE_TBL, - "remove tbMeta", - ctgActRemoveTb + CTG_OP_DROP_TB_META, + "drop tbMeta", + ctgOpDropTbMeta }, { - CTG_ACT_UPDATE_USER, + CTG_OP_UPDATE_USER, "update user", - ctgActUpdateUser + ctgOpUpdateUser + }, + { + CTG_OP_UPDATE_VG_EPSET, + "update epset", + ctgOpUpdateEpset } + }; @@ -405,7 +411,7 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t * } -int32_t ctgGetTbTypeFromCache(SCatalog* pCtg, const char* dbFName, const char *tableName, int32_t *tbType) { +int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, const char* dbFName, const char *tableName, int32_t *tbType) { if (NULL == pCtg->dbCache) { ctgWarn("empty db cache, dbFName:%s, tbName:%s", dbFName, tableName); return TSDB_CODE_SUCCESS; @@ -491,7 +497,7 @@ _return: } -void ctgWaitAction(SCtgMetaAction *action) { +void ctgWaitOpDone(SCtgCacheOperation *action) { while (true) { tsem_wait(&gCtgMgmt.queue.rspSem); @@ -509,7 +515,7 @@ void ctgWaitAction(SCtgMetaAction *action) { } } -void ctgPopAction(SCtgMetaAction **action) { +void ctgDequeue(SCtgCacheOperation **op) { SCtgQNode *orig = gCtgMgmt.queue.head; SCtgQNode *node = gCtgMgmt.queue.head->next; @@ -519,20 +525,20 @@ void ctgPopAction(SCtgMetaAction **action) { taosMemoryFreeClear(orig); - *action = &node->action; + *op = &node->op; } -int32_t ctgPushAction(SCatalog* pCtg, SCtgMetaAction *action) { +int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode)); if (NULL == node) { qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); CTG_RET(TSDB_CODE_CTG_MEM_ERROR); } - action->seqId = atomic_add_fetch_64(&gCtgMgmt.queue.seqId, 1); + operation->seqId = atomic_add_fetch_64(&gCtgMgmt.queue.seqId, 1); - node->action = *action; + node->op = *operation; CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); gCtgMgmt.queue.tail->next = node; @@ -544,19 +550,19 @@ int32_t ctgPushAction(SCatalog* pCtg, SCtgMetaAction *action) { tsem_post(&gCtgMgmt.queue.reqSem); - ctgDebug("action [%s] added into queue", gCtgAction[action->act].name); + ctgDebug("action [%s] added into queue", gCtgCacheOperation[operation->opId].name); - if (action->syncReq) { - ctgWaitAction(action); + if (operation->syncReq) { + ctgWaitOpDone(operation); } return TSDB_CODE_SUCCESS; } -int32_t ctgPutRmDBToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) { +int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) { int32_t code = 0; - SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB}; + SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_CACHE}; SCtgRemoveDBMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveDBMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg)); @@ -574,7 +580,7 @@ int32_t ctgPutRmDBToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) { action.data = msg; - CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); return TSDB_CODE_SUCCESS; @@ -585,9 +591,9 @@ _return: } -int32_t ctgPutRmStbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq) { +int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq) { int32_t code = 0; - SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB, .syncReq = syncReq}; + SCtgCacheOperation action= {.opId = CTG_OP_DROP_STB_META, .syncReq = syncReq}; SCtgRemoveStbMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveStbMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg)); @@ -602,7 +608,7 @@ int32_t ctgPutRmStbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, co action.data = msg; - CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); return TSDB_CODE_SUCCESS; @@ -614,9 +620,9 @@ _return: -int32_t ctgPutRmTbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq) { +int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq) { int32_t code = 0; - SCtgMetaAction action= {.act = CTG_ACT_REMOVE_TBL, .syncReq = syncReq}; + SCtgCacheOperation action= {.opId = CTG_OP_DROP_TB_META, .syncReq = syncReq}; SCtgRemoveTblMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveTblMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveTblMsg)); @@ -630,7 +636,7 @@ int32_t ctgPutRmTbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, con action.data = msg; - CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); return TSDB_CODE_SUCCESS; @@ -640,9 +646,9 @@ _return: CTG_RET(code); } -int32_t ctgPutUpdateVgToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq) { +int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq) { int32_t code = 0; - SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG, .syncReq = syncReq}; + SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_VGROUP, .syncReq = syncReq}; SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); @@ -662,7 +668,7 @@ int32_t ctgPutUpdateVgToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, action.data = msg; - CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); return TSDB_CODE_SUCCESS; @@ -673,9 +679,9 @@ _return: CTG_RET(code); } -int32_t ctgPutUpdateTbToQueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq) { +int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq) { int32_t code = 0; - SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL, .syncReq = syncReq}; + SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_TB_META, .syncReq = syncReq}; SCtgUpdateTblMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTblMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg)); @@ -692,7 +698,34 @@ int32_t ctgPutUpdateTbToQueue(SCatalog* pCtg, STableMetaOutput *output, bool syn action.data = msg; - CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); + + return TSDB_CODE_SUCCESS; + +_return: + + taosMemoryFreeClear(msg); + + CTG_RET(code); +} + +int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet) { + int32_t code = 0; + SCtgCacheOperation operation= {.opId = CTG_OP_UPDATE_VG_EPSET}; + SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + msg->pCtg = pCtg; + strcpy(msg->dbFName, dbFName); + msg->vgId = vgId; + msg->epSet = *pEpSet; + + operation.data = msg; + + CTG_ERR_JRET(ctgEnqueue(pCtg, &operation)); return TSDB_CODE_SUCCESS; @@ -703,9 +736,11 @@ _return: CTG_RET(code); } -int32_t ctgPutUpdateUserToQueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq) { + + +int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq) { int32_t code = 0; - SCtgMetaAction action= {.act = CTG_ACT_UPDATE_USER, .syncReq = syncReq}; + SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_USER, .syncReq = syncReq}; SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg)); @@ -717,7 +752,7 @@ int32_t ctgPutUpdateUserToQueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syn action.data = msg; - CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); return TSDB_CODE_SUCCESS; @@ -1219,7 +1254,7 @@ int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool sync int32_t code = 0; CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput)); - CTG_ERR_JRET(ctgPutUpdateTbToQueue(pCtg, pOutput, syncReq)); + CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq)); return TSDB_CODE_SUCCESS; @@ -1230,9 +1265,9 @@ _return: } -int32_t ctgActUpdateVg(SCtgMetaAction *action) { +int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { int32_t code = 0; - SCtgUpdateVgMsg *msg = action->data; + SCtgUpdateVgMsg *msg = operation->data; CTG_ERR_JRET(ctgWriteDBVgInfoToCache(msg->pCtg, msg->dbFName, msg->dbId, &msg->dbInfo)); @@ -1244,9 +1279,9 @@ _return: CTG_RET(code); } -int32_t ctgActRemoveDB(SCtgMetaAction *action) { +int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) { int32_t code = 0; - SCtgRemoveDBMsg *msg = action->data; + SCtgRemoveDBMsg *msg = operation->data; SCatalog* pCtg = msg->pCtg; SCtgDBCache *dbCache = NULL; @@ -1270,9 +1305,9 @@ _return: } -int32_t ctgActUpdateTb(SCtgMetaAction *action) { +int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) { int32_t code = 0; - SCtgUpdateTblMsg *msg = action->data; + SCtgUpdateTblMsg *msg = operation->data; SCatalog* pCtg = msg->pCtg; STableMetaOutput* output = msg->output; SCtgDBCache *dbCache = NULL; @@ -1316,9 +1351,9 @@ _return: } -int32_t ctgActRemoveStb(SCtgMetaAction *action) { +int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) { int32_t code = 0; - SCtgRemoveStbMsg *msg = action->data; + SCtgRemoveStbMsg *msg = operation->data; SCatalog* pCtg = msg->pCtg; SCtgDBCache *dbCache = NULL; @@ -1362,9 +1397,9 @@ _return: CTG_RET(code); } -int32_t ctgActRemoveTb(SCtgMetaAction *action) { +int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) { int32_t code = 0; - SCtgRemoveTblMsg *msg = action->data; + SCtgRemoveTblMsg *msg = operation->data; SCatalog* pCtg = msg->pCtg; SCtgDBCache *dbCache = NULL; @@ -1397,9 +1432,9 @@ _return: CTG_RET(code); } -int32_t ctgActUpdateUser(SCtgMetaAction *action) { +int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) { int32_t code = 0; - SCtgUpdateUserMsg *msg = action->data; + SCtgUpdateUserMsg *msg = operation->data; SCatalog* pCtg = msg->pCtg; if (NULL == pCtg->userCache) { @@ -1460,14 +1495,60 @@ _return: CTG_RET(code); } -void ctgUpdateThreadFuncUnexpectedStopped(void) { +int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) { + int32_t code = 0; + SCtgUpdateEpsetMsg *msg = operation->data; + SCatalog* pCtg = msg->pCtg; + + SCtgDBCache *dbCache = NULL; + CTG_ERR_RET(ctgAcquireDBCache(pCtg, msg->dbFName, &dbCache)); + if (NULL == dbCache) { + ctgDebug("db %s not exist, ignore epset update", msg->dbFName); + goto _return; + } + + SDBVgInfo *vgInfo = NULL; + CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache)); + + if (NULL == dbCache->vgInfo) { + ctgWReleaseVgInfo(dbCache); + ctgDebug("vgroup in db %s not cached, ignore epset update", msg->dbFName); + goto _return; + } + + SVgroupInfo* pInfo = taosHashGet(dbCache->vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId)); + if (NULL == pInfo) { + ctgWReleaseVgInfo(dbCache); + ctgDebug("no vgroup %d in db %s, ignore epset update", msg->vgId, msg->dbFName); + goto _return; + } + + pInfo->epSet = msg->epSet; + + ctgDebug("epset in vgroup %d updated, dbFName:%s", pInfo->vgId, msg->dbFName); + + ctgWReleaseVgInfo(dbCache); + +_return: + + if (dbCache) { + ctgReleaseDBCache(msg->pCtg, dbCache); + } + + taosMemoryFreeClear(msg); + + CTG_RET(code); +} + + +void ctgUpdateThreadUnexpectedStopped(void) { if (CTG_IS_LOCKED(&gCtgMgmt.lock) > 0) CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); } void* ctgUpdateThreadFunc(void* param) { setThreadName("catalog"); #ifdef WINDOWS - atexit(ctgUpdateThreadFuncUnexpectedStopped); + atexit(ctgUpdateThreadUnexpectedStopped); #endif qInfo("catalog update thread started"); @@ -1483,17 +1564,17 @@ void* ctgUpdateThreadFunc(void* param) { break; } - SCtgMetaAction *action = NULL; - ctgPopAction(&action); - SCatalog *pCtg = ((SCtgUpdateMsgHeader *)action->data)->pCtg; + SCtgCacheOperation *operation = NULL; + ctgDequeue(&operation); + SCatalog *pCtg = ((SCtgUpdateMsgHeader *)operation->data)->pCtg; - ctgDebug("process [%s] action", gCtgAction[action->act].name); + ctgDebug("process [%s] operation", gCtgCacheOperation[operation->opId].name); - (*gCtgAction[action->act].func)(action); + (*gCtgCacheOperation[operation->opId].func)(operation); - gCtgMgmt.queue.seqDone = action->seqId; + gCtgMgmt.queue.seqDone = operation->seqId; - if (action->syncReq) { + if (operation->syncReq) { tsem_post(&gCtgMgmt.queue.rspSem); } diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 1f78a97733..4fbf1463d8 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -42,6 +42,9 @@ void ctgFreeSMetaData(SMetaData* pData) { } taosArrayDestroy(pData->pDbCfg); pData->pDbCfg = NULL; + + taosArrayDestroy(pData->pDbInfo); + pData->pDbInfo = NULL; taosArrayDestroy(pData->pIndex); pData->pIndex = NULL; @@ -293,9 +296,12 @@ void ctgFreeTask(SCtgTask* pTask) { } case CTG_TASK_GET_DB_CFG: { taosMemoryFreeClear(pTask->taskCtx); - if (pTask->res) { - taosMemoryFreeClear(pTask->res); - } + taosMemoryFreeClear(pTask->res); + break; + } + case CTG_TASK_GET_DB_INFO: { + taosMemoryFreeClear(pTask->taskCtx); + taosMemoryFreeClear(pTask->res); break; } case CTG_TASK_GET_TB_HASH: { diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 6c7d1ac4ca..4409f2c321 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -41,7 +41,7 @@ namespace { extern "C" int32_t ctgdGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type); -extern "C" int32_t ctgActUpdateTb(SCtgMetaAction *action); +extern "C" int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action); extern "C" int32_t ctgdEnableDebug(char *option); extern "C" int32_t ctgdGetStatNum(char *option, void *res); @@ -888,9 +888,9 @@ void *ctgTestSetCtableMetaThread(void *param) { int32_t n = 0; STableMetaOutput *output = NULL; - SCtgMetaAction action = {0}; + SCtgCacheOperation operation = {0}; - action.act = CTG_ACT_UPDATE_TBL; + operation.opId = CTG_OP_UPDATE_TB_META; while (!ctgTestStop) { output = (STableMetaOutput *)taosMemoryMalloc(sizeof(STableMetaOutput)); @@ -899,9 +899,9 @@ void *ctgTestSetCtableMetaThread(void *param) { SCtgUpdateTblMsg *msg = (SCtgUpdateTblMsg *)taosMemoryMalloc(sizeof(SCtgUpdateTblMsg)); msg->pCtg = pCtg; msg->output = output; - action.data = msg; + operation.data = msg; - code = ctgActUpdateTb(&action); + code = ctgOpUpdateTbMeta(&operation); if (code) { assert(0); } -- GitLab