提交 14426025 编写于 作者: D dapan1121

add async get db info api

上级 e7b8a7d5
......@@ -52,23 +52,31 @@ typedef struct SUserAuthInfo {
AUTH_TYPE type;
} SUserAuthInfo;
typedef struct SDbInfo {
int32_t vgVer;
int32_t tbNum;
int64_t dbId;
} SDbInfo;
typedef struct SCatalogReq {
SArray *pTableMeta; // element is SNAME
SArray *pDbVgroup; // element is db full name
SArray *pDbCfg; // element is db full name
SArray *pDbInfo; // element is db full name
SArray *pTableMeta; // element is SNAME
SArray *pTableHash; // element is SNAME
SArray *pUdf; // element is udf name
SArray *pDbCfg; // element is db full name
SArray *pIndex; // element is index name
SArray *pUser; // element is SUserAuthInfo
bool qNodeRequired; // valid qnode
} SCatalogReq;
typedef struct SMetaData {
SArray *pTableMeta; // SArray<STableMeta*>
SArray *pDbVgroup; // SArray<SArray<SVgroupInfo>*>
SArray *pDbCfg; // SArray<SDbCfgInfo>
SArray *pDbInfo; // SArray<SDbInfo>
SArray *pTableMeta; // SArray<STableMeta*>
SArray *pTableHash; // SArray<SVgroupInfo>
SArray *pUdfList; // SArray<SFuncInfo>
SArray *pDbCfg; // SArray<SDbCfgInfo>
SArray *pIndex; // SArray<SIndexInfo>
SArray *pUser; // SArray<bool>
SArray *pQnodeList; // SArray<SQueryNodeAddr>
......@@ -269,6 +277,8 @@ int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const
int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet);
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId);
......
......@@ -49,19 +49,21 @@ enum {
};
enum {
CTG_ACT_UPDATE_VG = 0,
CTG_ACT_UPDATE_TBL,
CTG_ACT_REMOVE_DB,
CTG_ACT_REMOVE_STB,
CTG_ACT_REMOVE_TBL,
CTG_ACT_UPDATE_USER,
CTG_ACT_MAX
CTG_OP_UPDATE_VGROUP = 0,
CTG_OP_UPDATE_TB_META,
CTG_OP_DROP_DB_CACHE,
CTG_OP_DROP_STB_META,
CTG_OP_DROP_TB_META,
CTG_OP_UPDATE_USER,
CTG_OP_UPDATE_VG_EPSET,
CTG_OP_MAX
};
typedef enum {
CTG_TASK_GET_QNODE = 0,
CTG_TASK_GET_DB_VGROUP,
CTG_TASK_GET_DB_CFG,
CTG_TASK_GET_DB_INFO,
CTG_TASK_GET_TB_META,
CTG_TASK_GET_TB_HASH,
CTG_TASK_GET_INDEX,
......@@ -98,6 +100,10 @@ typedef struct SCtgDbCfgCtx {
char dbFName[TSDB_DB_FNAME_LEN];
} SCtgDbCfgCtx;
typedef struct SCtgDbInfoCtx {
char dbFName[TSDB_DB_FNAME_LEN];
} SCtgDbInfoCtx;
typedef struct SCtgTbHashCtx {
char dbFName[TSDB_DB_FNAME_LEN];
SName* pName;
......@@ -182,6 +188,7 @@ typedef struct SCtgJob {
int32_t dbCfgNum;
int32_t indexNum;
int32_t userNum;
int32_t dbInfoNum;
} SCtgJob;
typedef struct SCtgMsgCtx {
......@@ -285,16 +292,22 @@ typedef struct SCtgUpdateUserMsg {
SGetUserAuthRsp userAuth;
} SCtgUpdateUserMsg;
typedef struct SCtgUpdateEpsetMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
int32_t vgId;
SEpSet epSet;
} SCtgUpdateEpsetMsg;
typedef struct SCtgMetaAction {
int32_t act;
typedef struct SCtgCacheOperation {
int32_t opId;
void *data;
bool syncReq;
uint64_t seqId;
} SCtgMetaAction;
} SCtgCacheOperation;
typedef struct SCtgQNode {
SCtgMetaAction action;
SCtgCacheOperation op;
struct SCtgQNode *next;
} SCtgQNode;
......@@ -321,13 +334,13 @@ typedef struct SCatalogMgmt {
} SCatalogMgmt;
typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
typedef int32_t (*ctgActFunc)(SCtgMetaAction *);
typedef int32_t (*ctgOpFunc)(SCtgCacheOperation *);
typedef struct SCtgAction {
int32_t actId;
typedef struct SCtgOperation {
int32_t opId;
char name[32];
ctgActFunc func;
} SCtgAction;
ctgOpFunc func;
} SCtgOperation;
#define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
#define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
......@@ -435,12 +448,13 @@ int32_t ctgdShowCacheInfo(void);
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq);
int32_t ctgGetTbMetaFromCache(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgActUpdateVg(SCtgMetaAction *action);
int32_t ctgActUpdateTb(SCtgMetaAction *action);
int32_t ctgActRemoveDB(SCtgMetaAction *action);
int32_t ctgActRemoveStb(SCtgMetaAction *action);
int32_t ctgActRemoveTb(SCtgMetaAction *action);
int32_t ctgActUpdateUser(SCtgMetaAction *action);
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action);
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action);
int32_t ctgOpDropDbCache(SCtgCacheOperation *action);
int32_t ctgOpDropStbMeta(SCtgCacheOperation *action);
int32_t ctgOpDropTbMeta(SCtgCacheOperation *action);
int32_t ctgOpUpdateUser(SCtgCacheOperation *action);
int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation);
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache);
void ctgReleaseVgInfo(SCtgDBCache *dbCache);
......@@ -449,12 +463,13 @@ int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32
int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName);
int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass);
int32_t ctgPutRmDBToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId);
int32_t ctgPutRmStbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq);
int32_t ctgPutRmTbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq);
int32_t ctgPutUpdateVgToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq);
int32_t ctgPutUpdateTbToQueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq);
int32_t ctgPutUpdateUserToQueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq);
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId);
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq);
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq);
int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq);
int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq);
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq);
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet);
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type);
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size);
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size);
......
......@@ -41,9 +41,9 @@ int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq
tNameGetFullDbName(pTableName, dbFName);
if (TSDB_SUPER_TABLE == tblMeta->tableType) {
CTG_ERR_JRET(ctgPutRmStbToQueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq));
CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq));
} else {
CTG_ERR_JRET(ctgPutRmTbToQueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq));
CTG_ERR_JRET(ctgDropTbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq));
}
_return:
......@@ -72,7 +72,7 @@ int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, con
CTG_ERR_JRET(ctgCloneVgInfo(DbOut.dbVgroup, pInfo));
CTG_ERR_RET(ctgPutUpdateVgToQueue(pCtg, dbFName, DbOut.dbId, DbOut.dbVgroup, false));
CTG_ERR_RET(ctgUpdateVgroupEnqueue(pCtg, dbFName, DbOut.dbId, DbOut.dbVgroup, false));
return TSDB_CODE_SUCCESS;
......@@ -108,13 +108,13 @@ int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
if (code) {
if (CTG_DB_NOT_EXIST(code) && (NULL != dbCache)) {
ctgDebug("db no longer exist, dbFName:%s, dbId:%" PRIx64, input.db, input.dbId);
ctgPutRmDBToQueue(pCtg, input.db, input.dbId);
ctgDropDbCacheEnqueue(pCtg, input.db, input.dbId);
}
CTG_ERR_RET(code);
}
CTG_ERR_RET(ctgPutUpdateVgToQueue(pCtg, dbFName, DbOut.dbId, DbOut.dbVgroup, true));
CTG_ERR_RET(ctgUpdateVgroupEnqueue(pCtg, dbFName, DbOut.dbId, DbOut.dbVgroup, true));
return TSDB_CODE_SUCCESS;
}
......@@ -201,7 +201,7 @@ int32_t ctgRefreshTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMetaOutput **pOut
CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput));
}
CTG_ERR_JRET(ctgPutUpdateTbToQueue(pCtg, output, syncReq));
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, output, syncReq));
return TSDB_CODE_SUCCESS;
......@@ -298,9 +298,9 @@ _return:
}
if (TSDB_SUPER_TABLE == ctx->tbInfo.tbType) {
ctgPutRmStbToQueue(pCtg, dbFName, ctx->tbInfo.dbId, ctx->pName->tname, ctx->tbInfo.suid, false);
ctgDropStbMetaEnqueue(pCtg, dbFName, ctx->tbInfo.dbId, ctx->pName->tname, ctx->tbInfo.suid, false);
} else {
ctgPutRmTbToQueue(pCtg, dbFName, ctx->tbInfo.dbId, ctx->pName->tname, false);
ctgDropTbMetaEnqueue(pCtg, dbFName, ctx->tbInfo.dbId, ctx->pName->tname, false);
}
}
......@@ -348,7 +348,7 @@ int32_t ctgChkAuth(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const c
_return:
ctgPutUpdateUserToQueue(pCtg, &authRsp, false);
ctgUpdateUserEnqueue(pCtg, &authRsp, false);
return TSDB_CODE_SUCCESS;
}
......@@ -670,7 +670,7 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
}
code = ctgPutUpdateVgToQueue(pCtg, dbFName, dbId, dbInfo, false);
code = ctgUpdateVgroupEnqueue(pCtg, dbFName, dbId, dbInfo, false);
_return:
......@@ -691,7 +691,7 @@ int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) {
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
CTG_ERR_JRET(ctgPutRmDBToQueue(pCtg, dbFName, dbId));
CTG_ERR_JRET(ctgDropDbCacheEnqueue(pCtg, dbFName, dbId));
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
......@@ -701,7 +701,19 @@ _return:
}
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet) {
return 0;
CTG_API_ENTER();
int32_t code = 0;
if (NULL == pCtg || NULL == dbFName || NULL == epSet) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_ERR_JRET(ctgUpdateVgEpsetEnqueue(pCtg, (char*)dbFName, vgId, epSet));
_return:
CTG_API_LEAVE(code);
}
int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) {
......@@ -738,7 +750,7 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId,
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
CTG_ERR_JRET(ctgPutRmStbToQueue(pCtg, dbFName, dbId, stbName, suid, true));
CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, dbId, stbName, suid, true));
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
......@@ -791,7 +803,7 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) {
CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output->tbMeta));
CTG_ERR_JRET(ctgPutUpdateTbToQueue(pCtg, output, false));
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, output, false));
CTG_API_LEAVE(code);
......@@ -1152,7 +1164,7 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_LEAVE(ctgPutUpdateUserToQueue(pCtg, pAuth, false));
CTG_API_LEAVE(ctgUpdateUserEnqueue(pCtg, pAuth, false));
}
......
......@@ -95,6 +95,30 @@ int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
SCtgTask task = {0};
task.type = CTG_TASK_GET_DB_INFO;
task.taskId = taskIdx;
task.pJob = pJob;
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgDbInfoCtx));
if (NULL == task.taskCtx) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCtgDbInfoCtx* ctx = task.taskCtx;
memcpy(ctx->dbFName, dbFName, sizeof(ctx->dbFName));
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName);
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
SCtgTask task = {0};
......@@ -219,8 +243,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
int32_t dbCfgNum = (int32_t)taosArrayGetSize(pReq->pDbCfg);
int32_t indexNum = (int32_t)taosArrayGetSize(pReq->pIndex);
int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser);
int32_t dbInfoNum = (int32_t)taosArrayGetSize(pReq->pDbInfo);
int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum;
int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum;
if (taskNum <= 0) {
ctgError("empty input for job, taskNum:%d", taskNum);
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
......@@ -249,6 +274,7 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
pJob->dbCfgNum = dbCfgNum;
pJob->indexNum = indexNum;
pJob->userNum = userNum;
pJob->dbInfoNum = dbInfoNum;
pJob->pTasks = taosArrayInit(taskNum, sizeof(SCtgTask));
......@@ -268,6 +294,11 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
CTG_ERR_JRET(ctgInitGetDbCfgTask(pJob, taskIdx++, dbFName));
}
for (int32_t i = 0; i < dbInfoNum; ++i) {
char *dbFName = taosArrayGet(pReq->pDbInfo, i);
CTG_ERR_JRET(ctgInitGetDbInfoTask(pJob, taskIdx++, dbFName));
}
for (int32_t i = 0; i < tbMetaNum; ++i) {
SName *name = taosArrayGet(pReq->pTableMeta, i);
CTG_ERR_JRET(ctgInitGetTbMetaTask(pJob, taskIdx++, name));
......@@ -395,6 +426,20 @@ int32_t ctgDumpDbCfgRes(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgDumpDbInfoRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pDbInfo) {
pJob->jobRes.pDbInfo = taosArrayInit(pJob->dbInfoNum, sizeof(SDbInfo));
if (NULL == pJob->jobRes.pDbInfo) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
taosArrayPush(pJob->jobRes.pDbInfo, pTask->res);
return TSDB_CODE_SUCCESS;
}
int32_t ctgDumpUdfRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pUdfList) {
......@@ -620,7 +665,7 @@ int32_t ctgHandleGetDbVgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pM
CTG_ERR_JRET(ctgGenerateVgList(pCtg, pOut->dbVgroup->vgHash, (SArray**)&pTask->res));
CTG_ERR_JRET(ctgPutUpdateVgToQueue(pCtg, ctx->dbFName, pOut->dbId, pOut->dbVgroup, false));
CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, ctx->dbFName, pOut->dbId, pOut->dbVgroup, false));
pOut->dbVgroup = NULL;
break;
......@@ -659,7 +704,7 @@ int32_t ctgHandleGetTbHashRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, ctx->pName, (SVgroupInfo*)pTask->res));
CTG_ERR_JRET(ctgPutUpdateVgToQueue(pCtg, ctx->dbFName, pOut->dbId, pOut->dbVgroup, false));
CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, ctx->dbFName, pOut->dbId, pOut->dbVgroup, false));
pOut->dbVgroup = NULL;
break;
......@@ -691,6 +736,11 @@ _return:
CTG_RET(code);
}
int32_t ctgHandleGetDbInfoRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
CTG_RET(TSDB_CODE_APP_ERROR);
}
int32_t ctgHandleGetQnodeRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
......@@ -769,7 +819,7 @@ _return:
}
}
ctgPutUpdateUserToQueue(pCtg, pOut, false);
ctgUpdateUserEnqueue(pCtg, pOut, false);
taosMemoryFreeClear(pTask->msgCtx.out);
ctgHandleTaskEnd(pTask, code);
......@@ -933,6 +983,41 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) {
int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SCtgDBCache *dbCache = NULL;
SCtgDbInfoCtx* pCtx = (SCtgDbInfoCtx*)pTask->taskCtx;
pTask->res = taosMemoryCalloc(1, sizeof(SDbInfo));
if (NULL == pTask->res) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SDbInfo* pInfo = (SDbInfo*)pTask->res;
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
if (NULL != dbCache) {
pInfo->vgVer = dbCache->vgInfo->vgVersion;
pInfo->dbId = dbCache->dbId;
pInfo->tbNum = dbCache->vgInfo->numOfTable;
} else {
pInfo->vgVer = CTG_DEFAULT_INVALID_VERSION;
}
CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0));
_return:
if (dbCache) {
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
CTG_RET(code);
}
int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
......@@ -992,6 +1077,7 @@ SCtgAsyncFps gCtgAsyncFps[] = {
{ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes},
{ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes},
{ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes},
{ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes},
{ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes},
{ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes},
{ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes},
......
......@@ -19,37 +19,43 @@
#include "catalogInt.h"
#include "systable.h"
SCtgAction gCtgAction[CTG_ACT_MAX] = {
SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {
{
CTG_ACT_UPDATE_VG,
CTG_OP_UPDATE_VGROUP,
"update vgInfo",
ctgActUpdateVg
ctgOpUpdateVgroup
},
{
CTG_ACT_UPDATE_TBL,
CTG_OP_UPDATE_TB_META,
"update tbMeta",
ctgActUpdateTb
ctgOpUpdateTbMeta
},
{
CTG_ACT_REMOVE_DB,
"remove DB",
ctgActRemoveDB
CTG_OP_DROP_DB_CACHE,
"drop DB",
ctgOpDropDbCache
},
{
CTG_ACT_REMOVE_STB,
"remove stbMeta",
ctgActRemoveStb
CTG_OP_DROP_STB_META,
"drop stbMeta",
ctgOpDropStbMeta
},
{
CTG_ACT_REMOVE_TBL,
"remove tbMeta",
ctgActRemoveTb
CTG_OP_DROP_TB_META,
"drop tbMeta",
ctgOpDropTbMeta
},
{
CTG_ACT_UPDATE_USER,
CTG_OP_UPDATE_USER,
"update user",
ctgActUpdateUser
ctgOpUpdateUser
},
{
CTG_OP_UPDATE_VG_EPSET,
"update epset",
ctgOpUpdateEpset
}
};
......@@ -405,7 +411,7 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *
}
int32_t ctgGetTbTypeFromCache(SCatalog* pCtg, const char* dbFName, const char *tableName, int32_t *tbType) {
int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, const char* dbFName, const char *tableName, int32_t *tbType) {
if (NULL == pCtg->dbCache) {
ctgWarn("empty db cache, dbFName:%s, tbName:%s", dbFName, tableName);
return TSDB_CODE_SUCCESS;
......@@ -491,7 +497,7 @@ _return:
}
void ctgWaitAction(SCtgMetaAction *action) {
void ctgWaitOpDone(SCtgCacheOperation *action) {
while (true) {
tsem_wait(&gCtgMgmt.queue.rspSem);
......@@ -509,7 +515,7 @@ void ctgWaitAction(SCtgMetaAction *action) {
}
}
void ctgPopAction(SCtgMetaAction **action) {
void ctgDequeue(SCtgCacheOperation **op) {
SCtgQNode *orig = gCtgMgmt.queue.head;
SCtgQNode *node = gCtgMgmt.queue.head->next;
......@@ -519,20 +525,20 @@ void ctgPopAction(SCtgMetaAction **action) {
taosMemoryFreeClear(orig);
*action = &node->action;
*op = &node->op;
}
int32_t ctgPushAction(SCatalog* pCtg, SCtgMetaAction *action) {
int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
if (NULL == node) {
qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
CTG_RET(TSDB_CODE_CTG_MEM_ERROR);
}
action->seqId = atomic_add_fetch_64(&gCtgMgmt.queue.seqId, 1);
operation->seqId = atomic_add_fetch_64(&gCtgMgmt.queue.seqId, 1);
node->action = *action;
node->op = *operation;
CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
gCtgMgmt.queue.tail->next = node;
......@@ -544,19 +550,19 @@ int32_t ctgPushAction(SCatalog* pCtg, SCtgMetaAction *action) {
tsem_post(&gCtgMgmt.queue.reqSem);
ctgDebug("action [%s] added into queue", gCtgAction[action->act].name);
ctgDebug("action [%s] added into queue", gCtgCacheOperation[operation->opId].name);
if (action->syncReq) {
ctgWaitAction(action);
if (operation->syncReq) {
ctgWaitOpDone(operation);
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgPutRmDBToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) {
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) {
int32_t code = 0;
SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB};
SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_CACHE};
SCtgRemoveDBMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveDBMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg));
......@@ -574,7 +580,7 @@ int32_t ctgPutRmDBToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) {
action.data = msg;
CTG_ERR_JRET(ctgPushAction(pCtg, &action));
CTG_ERR_JRET(ctgEnqueue(pCtg, &action));
return TSDB_CODE_SUCCESS;
......@@ -585,9 +591,9 @@ _return:
}
int32_t ctgPutRmStbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq) {
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq) {
int32_t code = 0;
SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB, .syncReq = syncReq};
SCtgCacheOperation action= {.opId = CTG_OP_DROP_STB_META, .syncReq = syncReq};
SCtgRemoveStbMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveStbMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg));
......@@ -602,7 +608,7 @@ int32_t ctgPutRmStbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, co
action.data = msg;
CTG_ERR_JRET(ctgPushAction(pCtg, &action));
CTG_ERR_JRET(ctgEnqueue(pCtg, &action));
return TSDB_CODE_SUCCESS;
......@@ -614,9 +620,9 @@ _return:
int32_t ctgPutRmTbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq) {
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq) {
int32_t code = 0;
SCtgMetaAction action= {.act = CTG_ACT_REMOVE_TBL, .syncReq = syncReq};
SCtgCacheOperation action= {.opId = CTG_OP_DROP_TB_META, .syncReq = syncReq};
SCtgRemoveTblMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveTblMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveTblMsg));
......@@ -630,7 +636,7 @@ int32_t ctgPutRmTbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, con
action.data = msg;
CTG_ERR_JRET(ctgPushAction(pCtg, &action));
CTG_ERR_JRET(ctgEnqueue(pCtg, &action));
return TSDB_CODE_SUCCESS;
......@@ -640,9 +646,9 @@ _return:
CTG_RET(code);
}
int32_t ctgPutUpdateVgToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq) {
int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq) {
int32_t code = 0;
SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG, .syncReq = syncReq};
SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_VGROUP, .syncReq = syncReq};
SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
......@@ -662,7 +668,7 @@ int32_t ctgPutUpdateVgToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId,
action.data = msg;
CTG_ERR_JRET(ctgPushAction(pCtg, &action));
CTG_ERR_JRET(ctgEnqueue(pCtg, &action));
return TSDB_CODE_SUCCESS;
......@@ -673,9 +679,9 @@ _return:
CTG_RET(code);
}
int32_t ctgPutUpdateTbToQueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq) {
int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq) {
int32_t code = 0;
SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL, .syncReq = syncReq};
SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_TB_META, .syncReq = syncReq};
SCtgUpdateTblMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTblMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
......@@ -692,7 +698,34 @@ int32_t ctgPutUpdateTbToQueue(SCatalog* pCtg, STableMetaOutput *output, bool syn
action.data = msg;
CTG_ERR_JRET(ctgPushAction(pCtg, &action));
CTG_ERR_JRET(ctgEnqueue(pCtg, &action));
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet) {
int32_t code = 0;
SCtgCacheOperation operation= {.opId = CTG_OP_UPDATE_VG_EPSET};
SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
msg->pCtg = pCtg;
strcpy(msg->dbFName, dbFName);
msg->vgId = vgId;
msg->epSet = *pEpSet;
operation.data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &operation));
return TSDB_CODE_SUCCESS;
......@@ -703,9 +736,11 @@ _return:
CTG_RET(code);
}
int32_t ctgPutUpdateUserToQueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq) {
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq) {
int32_t code = 0;
SCtgMetaAction action= {.act = CTG_ACT_UPDATE_USER, .syncReq = syncReq};
SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_USER, .syncReq = syncReq};
SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
......@@ -717,7 +752,7 @@ int32_t ctgPutUpdateUserToQueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syn
action.data = msg;
CTG_ERR_JRET(ctgPushAction(pCtg, &action));
CTG_ERR_JRET(ctgEnqueue(pCtg, &action));
return TSDB_CODE_SUCCESS;
......@@ -1219,7 +1254,7 @@ int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool sync
int32_t code = 0;
CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput));
CTG_ERR_JRET(ctgPutUpdateTbToQueue(pCtg, pOutput, syncReq));
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq));
return TSDB_CODE_SUCCESS;
......@@ -1230,9 +1265,9 @@ _return:
}
int32_t ctgActUpdateVg(SCtgMetaAction *action) {
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgUpdateVgMsg *msg = action->data;
SCtgUpdateVgMsg *msg = operation->data;
CTG_ERR_JRET(ctgWriteDBVgInfoToCache(msg->pCtg, msg->dbFName, msg->dbId, &msg->dbInfo));
......@@ -1244,9 +1279,9 @@ _return:
CTG_RET(code);
}
int32_t ctgActRemoveDB(SCtgMetaAction *action) {
int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgRemoveDBMsg *msg = action->data;
SCtgRemoveDBMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
......@@ -1270,9 +1305,9 @@ _return:
}
int32_t ctgActUpdateTb(SCtgMetaAction *action) {
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgUpdateTblMsg *msg = action->data;
SCtgUpdateTblMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
STableMetaOutput* output = msg->output;
SCtgDBCache *dbCache = NULL;
......@@ -1316,9 +1351,9 @@ _return:
}
int32_t ctgActRemoveStb(SCtgMetaAction *action) {
int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgRemoveStbMsg *msg = action->data;
SCtgRemoveStbMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
......@@ -1362,9 +1397,9 @@ _return:
CTG_RET(code);
}
int32_t ctgActRemoveTb(SCtgMetaAction *action) {
int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgRemoveTblMsg *msg = action->data;
SCtgRemoveTblMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
......@@ -1397,9 +1432,9 @@ _return:
CTG_RET(code);
}
int32_t ctgActUpdateUser(SCtgMetaAction *action) {
int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgUpdateUserMsg *msg = action->data;
SCtgUpdateUserMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
if (NULL == pCtg->userCache) {
......@@ -1460,14 +1495,60 @@ _return:
CTG_RET(code);
}
void ctgUpdateThreadFuncUnexpectedStopped(void) {
int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgUpdateEpsetMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
CTG_ERR_RET(ctgAcquireDBCache(pCtg, msg->dbFName, &dbCache));
if (NULL == dbCache) {
ctgDebug("db %s not exist, ignore epset update", msg->dbFName);
goto _return;
}
SDBVgInfo *vgInfo = NULL;
CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache));
if (NULL == dbCache->vgInfo) {
ctgWReleaseVgInfo(dbCache);
ctgDebug("vgroup in db %s not cached, ignore epset update", msg->dbFName);
goto _return;
}
SVgroupInfo* pInfo = taosHashGet(dbCache->vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId));
if (NULL == pInfo) {
ctgWReleaseVgInfo(dbCache);
ctgDebug("no vgroup %d in db %s, ignore epset update", msg->vgId, msg->dbFName);
goto _return;
}
pInfo->epSet = msg->epSet;
ctgDebug("epset in vgroup %d updated, dbFName:%s", pInfo->vgId, msg->dbFName);
ctgWReleaseVgInfo(dbCache);
_return:
if (dbCache) {
ctgReleaseDBCache(msg->pCtg, dbCache);
}
taosMemoryFreeClear(msg);
CTG_RET(code);
}
void ctgUpdateThreadUnexpectedStopped(void) {
if (CTG_IS_LOCKED(&gCtgMgmt.lock) > 0) CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
}
void* ctgUpdateThreadFunc(void* param) {
setThreadName("catalog");
#ifdef WINDOWS
atexit(ctgUpdateThreadFuncUnexpectedStopped);
atexit(ctgUpdateThreadUnexpectedStopped);
#endif
qInfo("catalog update thread started");
......@@ -1483,17 +1564,17 @@ void* ctgUpdateThreadFunc(void* param) {
break;
}
SCtgMetaAction *action = NULL;
ctgPopAction(&action);
SCatalog *pCtg = ((SCtgUpdateMsgHeader *)action->data)->pCtg;
SCtgCacheOperation *operation = NULL;
ctgDequeue(&operation);
SCatalog *pCtg = ((SCtgUpdateMsgHeader *)operation->data)->pCtg;
ctgDebug("process [%s] action", gCtgAction[action->act].name);
ctgDebug("process [%s] operation", gCtgCacheOperation[operation->opId].name);
(*gCtgAction[action->act].func)(action);
(*gCtgCacheOperation[operation->opId].func)(operation);
gCtgMgmt.queue.seqDone = action->seqId;
gCtgMgmt.queue.seqDone = operation->seqId;
if (action->syncReq) {
if (operation->syncReq) {
tsem_post(&gCtgMgmt.queue.rspSem);
}
......
......@@ -42,6 +42,9 @@ void ctgFreeSMetaData(SMetaData* pData) {
}
taosArrayDestroy(pData->pDbCfg);
pData->pDbCfg = NULL;
taosArrayDestroy(pData->pDbInfo);
pData->pDbInfo = NULL;
taosArrayDestroy(pData->pIndex);
pData->pIndex = NULL;
......@@ -293,9 +296,12 @@ void ctgFreeTask(SCtgTask* pTask) {
}
case CTG_TASK_GET_DB_CFG: {
taosMemoryFreeClear(pTask->taskCtx);
if (pTask->res) {
taosMemoryFreeClear(pTask->res);
}
taosMemoryFreeClear(pTask->res);
break;
}
case CTG_TASK_GET_DB_INFO: {
taosMemoryFreeClear(pTask->taskCtx);
taosMemoryFreeClear(pTask->res);
break;
}
case CTG_TASK_GET_TB_HASH: {
......
......@@ -41,7 +41,7 @@
namespace {
extern "C" int32_t ctgdGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type);
extern "C" int32_t ctgActUpdateTb(SCtgMetaAction *action);
extern "C" int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action);
extern "C" int32_t ctgdEnableDebug(char *option);
extern "C" int32_t ctgdGetStatNum(char *option, void *res);
......@@ -888,9 +888,9 @@ void *ctgTestSetCtableMetaThread(void *param) {
int32_t n = 0;
STableMetaOutput *output = NULL;
SCtgMetaAction action = {0};
SCtgCacheOperation operation = {0};
action.act = CTG_ACT_UPDATE_TBL;
operation.opId = CTG_OP_UPDATE_TB_META;
while (!ctgTestStop) {
output = (STableMetaOutput *)taosMemoryMalloc(sizeof(STableMetaOutput));
......@@ -899,9 +899,9 @@ void *ctgTestSetCtableMetaThread(void *param) {
SCtgUpdateTblMsg *msg = (SCtgUpdateTblMsg *)taosMemoryMalloc(sizeof(SCtgUpdateTblMsg));
msg->pCtg = pCtg;
msg->output = output;
action.data = msg;
operation.data = msg;
code = ctgActUpdateTb(&action);
code = ctgOpUpdateTbMeta(&operation);
if (code) {
assert(0);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册