From 8f16b9b8c11eca9e463cb4fbcaf5c48a4a8fcfb7 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Sat, 4 Jun 2022 15:59:41 +0800 Subject: [PATCH] catalog force update --- include/libs/catalog/catalog.h | 3 +- include/libs/qcom/query.h | 2 +- source/client/src/clientEnv.c | 8 +-- source/client/src/clientImpl.c | 14 +++++ source/client/src/clientMsgHandler.c | 2 +- source/libs/catalog/inc/catalogInt.h | 21 +++++-- source/libs/catalog/src/ctgAsync.c | 6 ++ source/libs/catalog/src/ctgCache.c | 80 ++++++++++++++++++++++++--- source/libs/catalog/src/ctgDbg.c | 3 +- source/libs/scheduler/src/schJob.c | 25 +++++++-- source/libs/scheduler/src/schRemote.c | 1 + 11 files changed, 136 insertions(+), 29 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index f0e642bc9a..ee237741c3 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -68,6 +68,7 @@ typedef struct SCatalogReq { SArray *pIndex; // element is index name SArray *pUser; // element is SUserAuthInfo bool qNodeRequired; // valid qnode + bool forceUpdate; } SCatalogReq; typedef struct SMetaData { @@ -280,7 +281,7 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth); int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet); -int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId); +int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate); /** diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 45a7e9a29f..a17be44846 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -222,7 +222,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t || (_type) == TDMT_VND_DROP_TABLE || (_type) == TDMT_VND_DROP_STB) #define NEED_SCHEDULER_RETRY_ERROR(_code) \ - ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) + ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) #define REQUEST_MAX_TRY_TIMES 1 diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 4b39a51584..9afb989d27 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -223,6 +223,10 @@ static void doDestroyRequest(void *p) { taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); + if (pRequest->body.queryJob != 0) { + schedulerFreeJob(pRequest->body.queryJob); + } + taosMemoryFreeClear(pRequest->msgBuf); taosMemoryFreeClear(pRequest->sqlstr); taosMemoryFreeClear(pRequest->pDb); @@ -230,10 +234,6 @@ static void doDestroyRequest(void *p) { doFreeReqResultInfo(&pRequest->body.resInfo); qDestroyQueryPlan(pRequest->body.pDag); - if (pRequest->body.queryJob != 0) { - schedulerFreeJob(pRequest->body.queryJob); - } - taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->dbList); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 774ef5f248..9762ac6ba5 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -741,6 +741,20 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { do { destroyRequest(pRequest); pRequest = launchQuery(pTscObj, sql, sqlLen); + if (*sql == 'y') { + SCatalog *pCatalog = NULL; + code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); + SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + ctgdLaunchAsyncCall(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, pRequest->requestId, false); + break; + } else if (*sql == 'z') { + SCatalog *pCatalog = NULL; + code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); + SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + ctgdLaunchAsyncCall(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, pRequest->requestId, false); + break; + } + if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) { break; } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 1039d36362..2d5199f181 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -180,7 +180,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { taosMemoryFreeClear(output.dbVgroup); tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr()); - } else if (output.dbVgroup) { + } else if (output.dbVgroup && output.dbVgroup->vgHash) { struct SCatalog* pCatalog = NULL; int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index cebe696390..9219a382e4 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -52,6 +52,7 @@ enum { CTG_OP_UPDATE_VGROUP = 0, CTG_OP_UPDATE_TB_META, CTG_OP_DROP_DB_CACHE, + CTG_OP_DROP_DB_VGROUP, CTG_OP_DROP_STB_META, CTG_OP_DROP_TB_META, CTG_OP_UPDATE_USER, @@ -266,26 +267,32 @@ typedef struct SCtgUpdateTblMsg { STableMetaOutput* output; } SCtgUpdateTblMsg; -typedef struct SCtgRemoveDBMsg { +typedef struct SCtgDropDBMsg { SCatalog* pCtg; char dbFName[TSDB_DB_FNAME_LEN]; uint64_t dbId; -} SCtgRemoveDBMsg; +} SCtgDropDBMsg; -typedef struct SCtgRemoveStbMsg { +typedef struct SCtgDropDbVgroupMsg { + SCatalog* pCtg; + char dbFName[TSDB_DB_FNAME_LEN]; +} SCtgDropDbVgroupMsg; + + +typedef struct SCtgDropStbMetaMsg { SCatalog* pCtg; char dbFName[TSDB_DB_FNAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN]; uint64_t dbId; uint64_t suid; -} SCtgRemoveStbMsg; +} SCtgDropStbMetaMsg; -typedef struct SCtgRemoveTblMsg { +typedef struct SCtgDropTblMetaMsg { SCatalog* pCtg; char dbFName[TSDB_DB_FNAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN]; uint64_t dbId; -} SCtgRemoveTblMsg; +} SCtgDropTblMetaMsg; typedef struct SCtgUpdateUserMsg { SCatalog* pCtg; @@ -451,6 +458,7 @@ int32_t ctgGetTbMetaFromCache(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTabl int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action); int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action); int32_t ctgOpDropDbCache(SCtgCacheOperation *action); +int32_t ctgOpDropDbVgroup(SCtgCacheOperation *action); int32_t ctgOpDropStbMeta(SCtgCacheOperation *action); int32_t ctgOpDropTbMeta(SCtgCacheOperation *action); int32_t ctgOpUpdateUser(SCtgCacheOperation *action); @@ -464,6 +472,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName); int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass); int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId); +int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncReq); int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq); int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq); int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 312f0c9250..47ef45d7d7 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -286,6 +286,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* int32_t taskIdx = 0; for (int32_t i = 0; i < dbVgNum; ++i) { char* dbFName = taosArrayGet(pReq->pDbVgroup, i); + if (pReq->forceUpdate) { + ctgDropDbVgroupEnqueue(pCtg, dbFName, true); + } CTG_ERR_JRET(ctgInitGetDbVgTask(pJob, taskIdx++, dbFName)); } @@ -301,6 +304,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* for (int32_t i = 0; i < tbMetaNum; ++i) { SName* name = taosArrayGet(pReq->pTableMeta, i); + if (pReq->forceUpdate) { + catalogRemoveTableMeta(pCtg, name); + } CTG_ERR_JRET(ctgInitGetTbMetaTask(pJob, taskIdx++, name)); } diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 0f1344c343..8332c7b068 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -35,6 +35,11 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = { "drop DB", ctgOpDropDbCache }, + { + CTG_OP_DROP_DB_VGROUP, + "drop DBVgroup", + ctgOpDropDbVgroup + }, { CTG_OP_DROP_STB_META, "drop stbMeta", @@ -563,9 +568,9 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) { int32_t code = 0; SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_CACHE}; - SCtgRemoveDBMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveDBMsg)); + SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg)); if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg)); + ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg)); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -590,13 +595,43 @@ _return: CTG_RET(code); } +int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) { + int32_t code = 0; + SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_VGROUP, .syncOp = syncOp}; + SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + char *p = strchr(dbFName, '.'); + if (p && CTG_IS_SYS_DBNAME(p + 1)) { + dbFName = p + 1; + } + + msg->pCtg = pCtg; + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + + action.data = msg; + + CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); + + return TSDB_CODE_SUCCESS; + +_return: + + taosMemoryFreeClear(action.data); + CTG_RET(code); +} + + int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncOp) { int32_t code = 0; SCtgCacheOperation action= {.opId = CTG_OP_DROP_STB_META, .syncOp = syncOp}; - SCtgRemoveStbMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveStbMsg)); + SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg)); if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg)); + ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg)); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -623,9 +658,9 @@ _return: int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) { int32_t code = 0; SCtgCacheOperation action= {.opId = CTG_OP_DROP_TB_META, .syncOp = syncOp}; - SCtgRemoveTblMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveTblMsg)); + SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg)); if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveTblMsg)); + ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg)); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -1281,7 +1316,7 @@ _return: int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) { int32_t code = 0; - SCtgRemoveDBMsg *msg = operation->data; + SCtgDropDBMsg *msg = operation->data; SCatalog* pCtg = msg->pCtg; SCtgDBCache *dbCache = NULL; @@ -1304,6 +1339,33 @@ _return: CTG_RET(code); } +int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) { + int32_t code = 0; + SCtgDropDbVgroupMsg *msg = operation->data; + SCatalog* pCtg = msg->pCtg; + + SCtgDBCache *dbCache = NULL; + ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache); + if (NULL == dbCache) { + goto _return; + } + + CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache)); + + ctgFreeVgInfo(dbCache->vgInfo); + dbCache->vgInfo = NULL; + + ctgDebug("db vgInfo removed, dbFName:%s", msg->dbFName); + + ctgWReleaseVgInfo(dbCache); + +_return: + + taosMemoryFreeClear(msg); + + CTG_RET(code); +} + int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) { int32_t code = 0; @@ -1353,7 +1415,7 @@ _return: int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) { int32_t code = 0; - SCtgRemoveStbMsg *msg = operation->data; + SCtgDropStbMetaMsg *msg = operation->data; SCatalog* pCtg = msg->pCtg; SCtgDBCache *dbCache = NULL; @@ -1399,7 +1461,7 @@ _return: int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) { int32_t code = 0; - SCtgRemoveTblMsg *msg = operation->data; + SCtgDropTblMetaMsg *msg = operation->data; SCatalog* pCtg = msg->pCtg; SCtgDBCache *dbCache = NULL; diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c index fdab50db0f..cb18e45638 100644 --- a/source/libs/catalog/src/ctgDbg.c +++ b/source/libs/catalog/src/ctgDbg.c @@ -132,7 +132,7 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) { } } -int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId) { +int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate) { int32_t code = 0; SCatalogReq req = {0}; req.pTableMeta = taosArrayInit(2, sizeof(SName)); @@ -144,6 +144,7 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps req.pIndex = NULL;//taosArrayInit(2, TSDB_INDEX_FNAME_LEN); req.pUser = taosArrayInit(2, sizeof(SUserAuthInfo)); req.qNodeRequired = true; + req.forceUpdate = forceUpdate; SName name = {0}; char dbFName[TSDB_DB_FNAME_LEN] = {0}; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 3e23c395c9..be7b930d79 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -377,15 +377,21 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_ return TSDB_CODE_SUCCESS; } - taosHashRemove(pTask->execNodes, &execIdx, sizeof(execIdx)); + if (taosHashRemove(pTask->execNodes, &execIdx, sizeof(execIdx))) { + SCH_TASK_ELOG("fail to remove execIdx %d from execNodeList", execIdx); + } else { + SCH_TASK_DLOG("execIdx %d removed from execNodeList", execIdx); + } + if (execIdx != pTask->execIdx) { // ignore it + SCH_TASK_DLOG("execIdx %d is not current execIdx %d", execIdx, pTask->execIdx); SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR); } return TSDB_CODE_SUCCESS; } -int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) { +int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execIdx) { if (taosHashGetSize(pTask->execNodes) <= 0) { return TSDB_CODE_SUCCESS; } @@ -393,6 +399,8 @@ int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) { SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execIdx, sizeof(execIdx)); nodeInfo->handle = handle; + SCH_TASK_DLOG("handle updated to %p for execIdx %d", handle, execIdx); + return TSDB_CODE_SUCCESS; } @@ -403,7 +411,7 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v SCH_SET_TASK_HANDLE(pTask, handle); - schUpdateTaskExecNode(pTask, handle, execIdx); + schUpdateTaskExecNode(pJob, pTask, handle, execIdx); return TSDB_CODE_SUCCESS; } @@ -551,6 +559,8 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + SCH_TASK_DLOG("set %dth condidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port); + ++addNum; } } @@ -1110,6 +1120,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_UNLOCK(SCH_WRITE, &parent->lock); if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) { + SCH_TASK_DLOG("all %d children task done, start to launch parent task %" PRIx64, readyNum, parent->taskId); SCH_ERR_RET(schLaunchTask(pJob, parent)); } } @@ -1186,7 +1197,7 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo); } - SCH_TASK_DLOG("task has %d exec address", size); + SCH_TASK_DLOG("task has been dropped on %d exec nodes", size); } @@ -1196,7 +1207,8 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { } SCH_LOCK_TASK(pTask); - if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask) { + if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) { + SCH_TASK_DLOG("task execIdx %d will be rescheduled now", pTask->execIdx); schDropTaskOnExecNode(pJob, pTask); taosHashClear(pTask->execNodes); schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR); @@ -1306,9 +1318,10 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { int32_t code = 0; atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); - pTask->execIdx++; + SCH_TASK_DLOG("start to launch task's %dth exec", pTask->execIdx); + SCH_LOG_TASK_START_TS(pTask); if (schJobNeedToStop(pJob, &status)) { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index b336ce8c76..1389459604 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1037,6 +1037,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, if (NULL == addr) { addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); isCandidateAddr = true; + SCH_TASK_DLOG("target candidateIdx %d", pTask->candidateIdx); } SEpSet epSet = addr->epSet; -- GitLab