diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9bc2ae42687e4510765fee007badab1ba01cc9be..7ebe7e465ca24595d2f93ec08fa1513a92e7c87b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3053,6 +3053,7 @@ int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes); typedef struct { + int32_t msgIdx; int32_t msgType; int32_t msgLen; void* msg; @@ -3066,6 +3067,7 @@ typedef struct { typedef struct { int32_t reqType; + int32_t msgIdx; int32_t msgLen; int32_t rspCode; void* msg; diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 2beeb10335b02c6d08a75c0f2c498c825e56de54..654f46ec85682a21e8ef0009c3fcb654180c93b1 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -84,6 +84,9 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { } for (int32_t i = 0; i < msgNum; ++i) { + req.msgIdx = ntohl(*(int32_t*)((char*)pMsg->pCont + offset)); + offset += sizeof(req.msgIdx); + req.msgType = ntohl(*(int32_t*)((char*)pMsg->pCont + offset)); offset += sizeof(req.msgType); @@ -111,6 +114,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { } else { rsp.rspCode = 0; } + rsp.msgIdx = req.msgIdx; rsp.reqType = reqMsg.msgType; rsp.msgLen = reqMsg.info.rspLen; rsp.msg = reqMsg.info.rsp; @@ -136,6 +140,8 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { *(int32_t*)((char*)pRsp + offset) = htonl(p->reqType); offset += sizeof(p->reqType); + *(int32_t*)((char*)pRsp + offset) = htonl(p->msgIdx); + offset += sizeof(p->msgIdx); *(int32_t*)((char*)pRsp + offset) = htonl(p->msgLen); offset += sizeof(p->msgLen); *(int32_t*)((char*)pRsp + offset) = htonl(p->rspCode); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index d18ba88268c6f1a1700f5b13c8255c9b24c0c71b..707c4bc4717b08c4a392a4974fbda2877cd68368 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -273,6 +273,9 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { } for (int32_t i = 0; i < msgNum; ++i) { + req.msgIdx = ntohl(*(int32_t *)((char *)pMsg->pCont + offset)); + offset += sizeof(req.msgIdx); + req.msgType = ntohl(*(int32_t *)((char *)pMsg->pCont + offset)); offset += sizeof(req.msgType); @@ -301,6 +304,7 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { break; } + rsp.msgIdx = req.msgIdx; rsp.reqType = reqMsg.msgType; rsp.msgLen = reqMsg.contLen; rsp.rspCode = reqMsg.code; @@ -327,6 +331,8 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { *(int32_t *)((char *)pRsp + offset) = htonl(p->reqType); offset += sizeof(p->reqType); + *(int32_t *)((char *)pRsp + offset) = htonl(p->msgIdx); + offset += sizeof(p->msgIdx); *(int32_t *)((char *)pRsp + offset) = htonl(p->msgLen); offset += sizeof(p->msgLen); *(int32_t *)((char *)pRsp + offset) = htonl(p->rspCode); diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 816309beb15ee171cbc72773c5bbc98ac51eb5ce..a4e47e8053df928e9e93355089f79526600e69ed 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -80,6 +80,7 @@ typedef enum { CTG_TASK_GET_UDF, CTG_TASK_GET_USER, CTG_TASK_GET_SVR_VER, + CTG_TASK_GET_TB_META_BATCH, } CTG_TASK_TYPE; typedef enum { @@ -110,6 +111,22 @@ typedef struct SCtgTbMetaCtx { int32_t flag; } SCtgTbMetaCtx; +typedef struct SCtgFetch { + int32_t reqIdx; + int32_t fetchIdx; + int32_t flag; + SCtgTbCacheInfo tbInfo; + int32_t vgId; +} SCtgFetch; + +typedef struct SCtgTbMetaBCtx { + int32_t fetchNum; + SArray* pNames; + SArray* pTbMetas; + SArray* pFetchs; +} SCtgTbMetaBCtx; + + typedef struct SCtgTbIndexCtx { SName* pName; } SCtgTbIndexCtx; @@ -218,6 +235,7 @@ typedef struct SCtgJob { int32_t batchId; SHashObj* pBatchs; SArray* pTasks; + int32_t subTaskNum; int32_t taskDone; SMetaData jobRes; int32_t jobResCode; @@ -276,6 +294,7 @@ typedef struct SCtgTask { int32_t taskId; SCtgJob* pJob; void* taskCtx; + SArray* msgCtxs; SCtgMsgCtx msgCtx; int32_t code; void* res; @@ -284,6 +303,7 @@ typedef struct SCtgTask { SArray* pParents; SCtgSubRes subRes; SHashObj* pBatchs; + int32_t msgIdx; } SCtgTask; typedef int32_t (*ctgInitTaskFp)(SCtgJob*, int32_t, void*); @@ -487,6 +507,8 @@ typedef struct SCtgOperation { #define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB)) #define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE)) +#define CTG_GET_TASK_MSGCTX(_task) ((CTG_TASK_GET_TB_META_BATCH == (_task)->type) ? taosArrayGet((_task)->msgCtxs, (_task)->msgIdx) : &(_task)->msgCtx) + #define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema)) #define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST) @@ -586,6 +608,7 @@ int32_t ctgdShowCacheInfo(void); int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq); int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta); +int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pTbMetas); int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action); int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action); @@ -672,6 +695,7 @@ int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2); int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2); void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput); int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target); +int32_t ctgAddMsgCtx(SArray* pCtxs, int32_t reqType, void* out, char* target); char * ctgTaskTypeStr(CTG_TASK_TYPE type); int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId); int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 0184ac3a607a3dbf6c0ddec6cd9c8ea2df3e3781..8aa33b759afb013a806f34f2274ee3d6fe8b942c 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -50,6 +50,31 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, void* param) { return TSDB_CODE_SUCCESS; } +int32_t ctgInitGetTbMetaBTask(SCtgJob *pJob, int32_t taskIdx, void* param) { + SName *name = (SName*)param; + SCtgTask task = {0}; + + task.type = CTG_TASK_GET_TB_META_BATCH; + task.taskId = taskIdx; + task.pJob = pJob; + + task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbMetaBCtx)); + if (NULL == task.taskCtx) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + SCtgTbMetaBCtx* ctx = task.taskCtx; + ctx->pNames = param; + ctx->pTbMetas = taosArrayInit(taosArrayGetSize(ctx->pNames), sizeof(SMetaRes)); + + taosArrayPush(pJob->pTasks, &task); + + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbNum:%d", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames)); + + return TSDB_CODE_SUCCESS; +} + + int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { char *dbFName = (char*)param; SCtgTask task = {0}; @@ -328,7 +353,6 @@ int32_t ctgInitGetTbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { } - int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, const SCatalogReq* pReq) { SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (NULL == pDb) { @@ -452,7 +476,8 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const } SCtgJob *pJob = *job; - + + pJob->subTaskNum = taskNum; pJob->queryId = pConn->requestId; pJob->userFp = fp; pJob->pCtg = pCtg; @@ -506,10 +531,16 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DB_INFO, dbFName, NULL)); } +#if 0 for (int32_t i = 0; i < tbMetaNum; ++i) { SName* name = taosArrayGet(pReq->pTableMeta, i); CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_META, name, NULL)); } +#else + if (tbMetaNum > 0) { + CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_META_BATCH, pReq->pTableMeta, NULL)); + } +#endif for (int32_t i = 0; i < tbHashNum; ++i) { SName* name = taosArrayGet(pReq->pTableHash, i); @@ -586,6 +617,15 @@ int32_t ctgDumpTbMetaRes(SCtgTask* pTask) { return TSDB_CODE_SUCCESS; } +int32_t ctgDumpTbMetaBRes(SCtgTask* pTask) { + SCtgJob* pJob = pTask->pJob; + + pJob->jobRes.pTableMeta = pTask->res; + + return TSDB_CODE_SUCCESS; +} + + int32_t ctgDumpDbVgRes(SCtgTask* pTask) { SCtgJob* pJob = pTask->pJob; if (NULL == pJob->jobRes.pDbVgroup) { @@ -847,43 +887,55 @@ _return: int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; SCtgDBCache *dbCache = NULL; - SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask); +#if CTG_BATCH_FETCH + SCtgTbMetaBCtx* ctx = (SCtgTbMetaBCtx*)pTask->taskCtx; + SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx); + SName* pName = taosArrayGet(ctx->pNames, pFetch->reqIdx); + int32_t flag = pFetch->flag; + int32_t* vgId = &pFetch->vgId; +#else + SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; + SName* pName = ctx->pName; + int32_t flag = ctx->flag; + int32_t* vgId = &ctx->vgId; +#endif - CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); + CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); switch (reqType) { case TDMT_MND_USE_DB: { - SUseDbOutput* pOut = (SUseDbOutput*)pTask->msgCtx.out; + SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out; SVgroupInfo vgInfo = {0}; - CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, ctx->pName, &vgInfo)); + CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, pName, &vgInfo)); - ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag); + ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); - ctx->vgId = vgInfo.vgId; - CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask)); + *vgId = vgInfo.vgId; + CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, pTask)); return TSDB_CODE_SUCCESS; } case TDMT_MND_TABLE_META: { - STableMetaOutput* pOut = (STableMetaOutput*)pTask->msgCtx.out; + STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; if (CTG_IS_META_NULL(pOut->metaType)) { - if (CTG_FLAG_IS_STB(ctx->flag)) { + if (CTG_FLAG_IS_STB(flag)) { char dbFName[TSDB_DB_FNAME_LEN] = {0}; - tNameGetFullDbName(ctx->pName, dbFName); + tNameGetFullDbName(pName, dbFName); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); if (NULL != dbCache) { SVgroupInfo vgInfo = {0}; - CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo)); + CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo)); - ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag); + ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); - ctx->vgId = vgInfo.vgId; - CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask)); + *vgId = vgInfo.vgId; + CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, pTask)); ctgReleaseVgInfoToCache(pCtg, dbCache); } else { @@ -898,52 +950,58 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf * return TSDB_CODE_SUCCESS; } - ctgError("no tbmeta got, tbName:%s", tNameGetTableName(ctx->pName)); - ctgRemoveTbMetaFromCache(pCtg, ctx->pName, false); + ctgError("no tbmeta got, tbName:%s", tNameGetTableName(pName)); + ctgRemoveTbMetaFromCache(pCtg, pName, false); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); } - if (pTask->msgCtx.lastOut) { - TSWAP(pTask->msgCtx.out, pTask->msgCtx.lastOut); - STableMetaOutput* pLastOut = (STableMetaOutput*)pTask->msgCtx.out; + if (pMsgCtx->lastOut) { + TSWAP(pMsgCtx->out, pMsgCtx->lastOut); + STableMetaOutput* pLastOut = (STableMetaOutput*)pMsgCtx->out; TSWAP(pLastOut->tbMeta, pOut->tbMeta); } break; } case TDMT_VND_TABLE_META: { - STableMetaOutput* pOut = (STableMetaOutput*)pTask->msgCtx.out; + STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; if (CTG_IS_META_NULL(pOut->metaType)) { - ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(ctx->pName)); - ctgRemoveTbMetaFromCache(pCtg, ctx->pName, false); + ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName)); + ctgRemoveTbMetaFromCache(pCtg, pName, false); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); } - if (CTG_FLAG_IS_STB(ctx->flag)) { + if (CTG_FLAG_IS_STB(flag)) { break; } if (CTG_IS_META_TABLE(pOut->metaType) && TSDB_SUPER_TABLE == pOut->tbMeta->tableType) { - ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(ctx->pName)); + ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName)); taosMemoryFreeClear(pOut->tbMeta); - CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, ctx->pName, NULL, pTask)); + CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, pTask)); } else if (CTG_IS_META_BOTH(pOut->metaType)) { int32_t exist = 0; - if (!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) { - CTG_ERR_JRET(ctgTbMetaExistInCache(pCtg, pOut->dbFName, pOut->tbName, &exist)); + if (!CTG_FLAG_IS_FORCE_UPDATE(flag)) { + SName stbName = *pName; + strcpy(stbName.tname, pOut->tbName); + SCtgTbMetaCtx stbCtx = {0}; + stbCtx.flag = flag; + stbCtx.pName = &stbName; + + taosMemoryFreeClear(pOut->tbMeta); + CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta)); + if (pOut->tbMeta) { + exist = 1; + } } if (0 == exist) { - TSWAP(pTask->msgCtx.lastOut, pTask->msgCtx.out); + TSWAP(pMsgCtx->lastOut, pMsgCtx->out); CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pOut->dbFName, pOut->tbName, NULL, pTask)); - } else { - taosMemoryFreeClear(pOut->tbMeta); - - SET_META_TYPE_CTABLE(pOut->metaType); } } break; @@ -954,17 +1012,20 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf * break; } - STableMetaOutput* pOut = (STableMetaOutput*)pTask->msgCtx.out; + STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; ctgUpdateTbMetaToCache(pCtg, pOut, false); if (CTG_IS_META_BOTH(pOut->metaType)) { memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); - } else if (CTG_IS_META_CTABLE(pOut->metaType)) { - SName stbName = *ctx->pName; + } + +/* + else if (CTG_IS_META_CTABLE(pOut->metaType)) { + SName stbName = *pName; strcpy(stbName.tname, pOut->tbName); SCtgTbMetaCtx stbCtx = {0}; - stbCtx.flag = ctx->flag; + stbCtx.flag = flag; stbCtx.pName = &stbName; CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta)); @@ -977,8 +1038,19 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf * memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); } +*/ +#if CTG_BATCH_FETCH + SMetaRes* pRes = taosArrayGet(ctx->pTbMetas, pFetch->reqIdx); + pRes->code = 0; + pRes->pRes = pOut->tbMeta; + pOut->tbMeta = NULL; + if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { + TSWAP(pTask->res, ctx->pTbMetas); + } +#else TSWAP(pTask->res, pOut->tbMeta); +#endif _return: @@ -986,8 +1058,10 @@ _return: ctgReleaseVgInfoToCache(pCtg, dbCache); } - ctgHandleTaskEnd(pTask, code); - + if (pTask->res || code) { + ctgHandleTaskEnd(pTask, code); + } + CTG_RET(code); } @@ -1224,38 +1298,37 @@ _return: CTG_RET(code); } -int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) { +int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask, int32_t flag, SName* pName, int32_t* vgId) { SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; int32_t code = 0; - SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; - if (CTG_FLAG_IS_SYS_DB(ctx->flag)) { - ctgDebug("will refresh sys db tbmeta, tbName:%s", tNameGetTableName(ctx->pName)); + if (CTG_FLAG_IS_SYS_DB(flag)) { + ctgDebug("will refresh sys db tbmeta, tbName:%s", tNameGetTableName(pName)); - CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, (char *)ctx->pName->dbname, (char *)ctx->pName->tname, NULL, pTask)); + CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, (char *)pName->dbname, (char *)pName->tname, NULL, pTask)); } - if (CTG_FLAG_IS_STB(ctx->flag)) { - ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(ctx->pName)); + if (CTG_FLAG_IS_STB(flag)) { + ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pName)); // if get from mnode failed, will not try vnode - CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, ctx->pName, NULL, pTask)); + CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, pTask)); } SCtgDBCache *dbCache = NULL; char dbFName[TSDB_DB_FNAME_LEN] = {0}; - tNameGetFullDbName(ctx->pName, dbFName); + tNameGetFullDbName(pName, dbFName); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); if (dbCache) { SVgroupInfo vgInfo = {0}; - CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo)); + CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo)); - ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag); + ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); - ctx->vgId = vgInfo.vgId; - CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask)); + *vgId = vgInfo.vgId; + CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, pTask)); } else { SBuildUseDBInput input = {0}; @@ -1284,11 +1357,40 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) { return TSDB_CODE_SUCCESS; } - CTG_ERR_RET(ctgAsyncRefreshTbMeta(pTask)); + SCtgTbMetaCtx* pCtx = (SCtgTbMetaCtx*)pTask->taskCtx; + CTG_ERR_RET(ctgAsyncRefreshTbMeta(pTask, pCtx->flag, pCtx->pName, &pCtx->vgId)); return TSDB_CODE_SUCCESS; } +int32_t ctgLaunchGetTbMetaBTask(SCtgTask *pTask) { + SCatalog* pCtg = pTask->pJob->pCtg; + SRequestConnInfo* pConn = &pTask->pJob->conn; + + CTG_ERR_RET(ctgGetTbMetaBFromCache(pCtg, pConn, (SCtgTbMetaBCtx*)pTask->taskCtx, (SArray**)&pTask->res)); + if (pTask->res) { + CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0)); + return TSDB_CODE_SUCCESS; + } + + SCtgTbMetaBCtx* pCtx = (SCtgTbMetaBCtx*)pTask->taskCtx; + pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs); + pTask->msgCtxs = taosArrayInit(pCtx->fetchNum, sizeof(SCtgMsgCtx)); + + for (int32_t i = 0; i < pCtx->fetchNum; ++i) { + SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); + SName* pName = taosArrayGet(pCtx->pNames, pFetch->reqIdx); + + pTask->msgIdx = pFetch->fetchIdx; + CTG_ERR_RET(ctgAddMsgCtx(pTask->msgCtxs, 0, NULL, NULL)); + CTG_ERR_RET(ctgAsyncRefreshTbMeta(pTask, pFetch->flag, pName, &pFetch->vgId)); + } + + pTask->msgIdx = 0; + + return TSDB_CODE_SUCCESS; +} + int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) { int32_t code = 0; SCatalog* pCtg = pTask->pJob->pCtg; @@ -1597,19 +1699,20 @@ int32_t ctgCloneDbVg(SCtgTask* pTask, void** pRes) { SCtgAsyncFps gCtgAsyncFps[] = { - {ctgInitGetQnodeTask, ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes, NULL, NULL}, - {ctgInitGetDnodeTask, ctgLaunchGetDnodeTask, ctgHandleGetDnodeRsp, ctgDumpDnodeRes, NULL, NULL}, - {ctgInitGetDbVgTask, ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes, ctgCompDbVgTasks, ctgCloneDbVg}, - {ctgInitGetDbCfgTask, ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes, NULL, NULL}, - {ctgInitGetDbInfoTask, ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes, NULL, NULL}, - {ctgInitGetTbMetaTask, ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes, ctgCompTbMetaTasks, ctgCloneTbMeta}, - {ctgInitGetTbHashTask, ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes, NULL, NULL}, - {ctgInitGetTbIndexTask, ctgLaunchGetTbIndexTask, ctgHandleGetTbIndexRsp, ctgDumpTbIndexRes, NULL, NULL}, - {ctgInitGetTbCfgTask, ctgLaunchGetTbCfgTask, ctgHandleGetTbCfgRsp, ctgDumpTbCfgRes, NULL, NULL}, - {ctgInitGetIndexTask, ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes, NULL, NULL}, - {ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL}, - {ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL}, - {ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL}, + {ctgInitGetQnodeTask, ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes, NULL, NULL}, + {ctgInitGetDnodeTask, ctgLaunchGetDnodeTask, ctgHandleGetDnodeRsp, ctgDumpDnodeRes, NULL, NULL}, + {ctgInitGetDbVgTask, ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes, ctgCompDbVgTasks, ctgCloneDbVg}, + {ctgInitGetDbCfgTask, ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes, NULL, NULL}, + {ctgInitGetDbInfoTask, ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes, NULL, NULL}, + {ctgInitGetTbMetaTask, ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes, ctgCompTbMetaTasks, ctgCloneTbMeta}, + {ctgInitGetTbHashTask, ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes, NULL, NULL}, + {ctgInitGetTbIndexTask, ctgLaunchGetTbIndexTask, ctgHandleGetTbIndexRsp, ctgDumpTbIndexRes, NULL, NULL}, + {ctgInitGetTbCfgTask, ctgLaunchGetTbCfgTask, ctgHandleGetTbCfgRsp, ctgDumpTbCfgRes, NULL, NULL}, + {ctgInitGetIndexTask, ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes, NULL, NULL}, + {ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL}, + {ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL}, + {ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL}, + {ctgInitGetTbMetaBTask, ctgLaunchGetTbMetaBTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaBRes, NULL, NULL}, }; int32_t ctgMakeAsyncRes(SCtgJob *pJob) { diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 2e8e259151b3b19a06dfd271ad4c47c7b30673cb..1b5968d426fd6c6bae652a6412620a8cfe055de4 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -2152,6 +2152,60 @@ int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMet return TSDB_CODE_SUCCESS; } +int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pTbMetas) { + int32_t tbNum = taosArrayGetSize(ctx->pNames); + SName* fName = taosArrayGet(ctx->pNames, 0); + int32_t fIdx = 0; + + for (int32_t i = 0; i < tbNum; ++i) { + SName* pName = taosArrayGet(ctx->pNames, i); + SCtgTbMetaCtx nctx = {0}; + nctx.flag = CTG_FLAG_UNKNOWN_STB; + nctx.pName = pName; + + if (IS_SYS_DBNAME(pName->dbname)) { + CTG_FLAG_SET_SYS_DB(nctx.flag); + } + + STableMeta *pTableMeta = NULL; + CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, &nctx, &pTableMeta)); + SMetaRes res = {0}; + + if (pTableMeta) { + if (CTG_FLAG_MATCH_STB(nctx.flag, pTableMeta->tableType) && + ((!CTG_FLAG_IS_FORCE_UPDATE(nctx.flag)) || (CTG_FLAG_IS_SYS_DB(nctx.flag)))) { + res.pRes = pTableMeta; + } else { + taosMemoryFreeClear(pTableMeta); + } + } + + if (NULL == res.pRes) { + if (NULL == ctx->pFetchs) { + ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch)); + } + + if (CTG_FLAG_IS_UNKNOWN_STB(nctx.flag)) { + CTG_FLAG_SET_STB(nctx.flag, nctx.tbInfo.tbType); + } + + SCtgFetch fetch = {0}; + fetch.reqIdx = i; + fetch.fetchIdx = fIdx++; + fetch.flag = nctx.flag; + + taosArrayPush(ctx->pFetchs, &fetch); + } + + taosArrayPush(ctx->pTbMetas, &res); + } + + if (NULL == ctx->pFetchs) { + TSWAP(*pTbMetas, ctx->pTbMetas); + } + + return TSDB_CODE_SUCCESS; +} int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) { int32_t code = 0; diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 0f97b5c5b196d4a5c6ef2ad48860cacfa88dccf6..d489496e3783943da9f6e49d8f7389d6d6296de5 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -46,6 +46,8 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu if (msgNum > 0) { rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); offset += sizeof(rsp.reqType); + rsp.msgIdx = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); + offset += sizeof(rsp.msgIdx); rsp.msgLen = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); offset += sizeof(rsp.msgLen); rsp.rspCode = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); @@ -57,6 +59,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu taskMsg.pData = rsp.msg; taskMsg.len = rsp.msgLen; } else { + rsp.msgIdx = pTask->msgIdx++; rsp.reqType = -1; taskMsg.msgType = -1; taskMsg.pData = NULL; @@ -64,6 +67,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu } pTask->pBatchs = pBatchs; + pTask->msgIdx = rsp.msgIdx; ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1)); @@ -431,19 +435,19 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT SHashObj* pBatchs = pTask->pBatchs; SCtgJob* pJob = pTask->pJob; SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId)); - int32_t taskNum = taosArrayGetSize(pTask->pJob->pTasks); SCtgBatch newBatch = {0}; SBatchMsg req = {0}; if (NULL == pBatch) { - newBatch.pMsgs = taosArrayInit(taskNum, sizeof(SBatchMsg)); - newBatch.pTaskIds = taosArrayInit(taskNum, sizeof(int32_t)); + newBatch.pMsgs = taosArrayInit(pJob->subTaskNum, sizeof(SBatchMsg)); + newBatch.pTaskIds = taosArrayInit(pJob->subTaskNum, sizeof(int32_t)); if (NULL == newBatch.pMsgs || NULL == newBatch.pTaskIds) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } newBatch.conn = *pConn; + req.msgIdx = pTask->msgIdx; req.msgType = msgType; req.msgLen = msgSize; req.msg = msg; @@ -456,16 +460,25 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT newBatch.msgSize = sizeof(SBatchReq) + sizeof(req) + msgSize - POINTER_BYTES; if (vgId > 0) { + SName* pName = NULL; if (TDMT_VND_TABLE_CFG == msgType) { SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; - tNameGetFullDbName(ctx->pName, newBatch.dbFName); + pName = ctx->pName; } else if (TDMT_VND_TABLE_META == msgType) { - SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; - tNameGetFullDbName(ctx->pName, newBatch.dbFName); + if (CTG_TASK_GET_TB_META_BATCH == pTask->type) { + SCtgTbMetaBCtx* ctx = (SCtgTbMetaBCtx*)pTask->taskCtx; + SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx); + pName = taosArrayGet(ctx->pNames, fetch->reqIdx); + } else { + SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; + pName = ctx->pName; + } } else { ctgError("invalid vnode msgType %d", msgType); CTG_ERR_JRET(TSDB_CODE_APP_ERROR); } + + tNameGetFullDbName(pName, newBatch.dbFName); } newBatch.msgType = (vgId > 0) ? TDMT_VND_BATCH_META : TDMT_MND_BATCH_META; @@ -480,6 +493,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT return TSDB_CODE_SUCCESS; } + req.msgIdx = pTask->msgIdx; req.msgType = msgType; req.msgLen = msgSize; req.msg = msg; @@ -492,16 +506,25 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT pBatch->msgSize += sizeof(req) + msgSize - POINTER_BYTES; if (vgId > 0) { + SName* pName = NULL; if (TDMT_VND_TABLE_CFG == msgType) { SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; - tNameGetFullDbName(ctx->pName, newBatch.dbFName); + pName = ctx->pName; } else if (TDMT_VND_TABLE_META == msgType) { - SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; - tNameGetFullDbName(ctx->pName, newBatch.dbFName); + if (CTG_TASK_GET_TB_META_BATCH == pTask->type) { + SCtgTbMetaBCtx* ctx = (SCtgTbMetaBCtx*)pTask->taskCtx; + SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx); + pName = taosArrayGet(ctx->pNames, fetch->reqIdx); + } else { + SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; + pName = ctx->pName; + } } else { ctgError("invalid vnode msgType %d", msgType); CTG_ERR_JRET(TSDB_CODE_APP_ERROR); } + + tNameGetFullDbName(pName, newBatch.dbFName); } ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, vgId); @@ -517,7 +540,7 @@ _return: } int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { - *msg = taosMemoryMalloc(pBatch->msgSize); + *msg = taosMemoryCalloc(1, pBatch->msgSize); if (NULL == (*msg)) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -532,6 +555,8 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { for (int32_t i = 0; i < num; ++i) { SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i); + *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgIdx); + offset += sizeof(pReq->msgIdx); *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType); offset += sizeof(pReq->msgType); *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgLen); @@ -599,7 +624,8 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray if (NULL == pOut) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, NULL)); + + CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, NULL)); #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); @@ -645,7 +671,7 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray } if (pTask) { - CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL)); + CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, NULL, NULL)); #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); @@ -696,7 +722,8 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU if (NULL == pOut) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, input->db)); + + CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, input->db)); #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); @@ -746,7 +773,8 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char if (NULL == pOut) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)dbFName)); + + CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, (char*)dbFName)); #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); @@ -796,7 +824,8 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const if (NULL == pOut) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)indexName)); + + CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, (char*)indexName)); #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); @@ -849,7 +878,7 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName)); + CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, (char*)tbFName)); #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); @@ -899,7 +928,8 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch if (NULL == pOut) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)funcName)); + + CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, (char*)funcName)); #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); @@ -949,7 +979,8 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const if (NULL == pOut) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)user)); + + CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, (char*)user)); #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); @@ -1004,7 +1035,9 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char if (NULL == pOut) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName)); + + CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, tbFName)); + #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); #else @@ -1068,13 +1101,14 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa if (NULL == pOut) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName)); SRequestConnInfo vConn = {.pTrans = pConn->pTrans, .requestId = pConn->requestId, .requestObjRefId = pConn->requestObjRefId, .mgmtEps = vgroupInfo->epSet}; + CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, tbFName)); + #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen)); #else @@ -1129,7 +1163,7 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S } if (pTask) { - CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName)); + CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, NULL, (char*)tbFName)); SRequestConnInfo vConn = {.pTrans = pConn->pTrans, .requestId = pConn->requestId, @@ -1188,7 +1222,8 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S } if (pTask) { - CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName)); + CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, NULL, (char*)tbFName)); + #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); #else @@ -1233,7 +1268,7 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou } if (pTask) { - CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL)); + CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, NULL, NULL)); #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index bdf60b110b082ffef530b76dbc033ac2fa272506..9f0d0f71d5bb28bdfbed006a256136f46efbd09f 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -88,6 +88,8 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) { return "[get user]"; case CTG_TASK_GET_SVR_VER: return "[get svr ver]"; + case CTG_TASK_GET_TB_META_BATCH: + return "[bget table meta]"; default: return "unknown"; } @@ -460,6 +462,15 @@ void ctgResetTbMetaTask(SCtgTask* pTask) { taosMemoryFreeClear(pTask->res); } +void ctgFreeBatchMeta(void* meta) { + if (NULL == meta) { + return; + } + + SMetaRes* pRes = (SMetaRes*)meta; + taosMemoryFreeClear(pRes->pRes); +} + void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) { switch (type) { case CTG_TASK_GET_QNODE: @@ -500,6 +511,15 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) { taosMemoryFreeClear(*pRes); break; } + case CTG_TASK_GET_TB_META_BATCH: { + SArray* pArray = (SArray*)*pRes; + int32_t num = taosArrayGetSize(pArray); + for (int32_t i = 0; i < num; ++i) { + ctgFreeBatchMeta(taosArrayGet(pArray, i)); + } + *pRes = NULL; // no need to free it + break; + } default: qError("invalid task type %d", type); break; @@ -554,6 +574,11 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) { taosMemoryFreeClear(*pRes); break; } + case CTG_TASK_GET_TB_META_BATCH: { + taosArrayDestroyEx(*pRes, ctgFreeBatchMeta); + *pRes = NULL; + break; + } default: qError("invalid task type %d", type); break; @@ -583,6 +608,21 @@ void ctgFreeTaskCtx(SCtgTask* pTask) { taosMemoryFreeClear(pTask->taskCtx); break; } + case CTG_TASK_GET_TB_META_BATCH: { + SCtgTbMetaBCtx* taskCtx = (SCtgTbMetaBCtx*)pTask->taskCtx; + taosArrayDestroyEx(taskCtx->pTbMetas, ctgFreeBatchMeta); + taosArrayDestroy(taskCtx->pFetchs); + // NO NEED TO FREE pNames + + taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeMsgCtx); + + if (pTask->msgCtx.lastOut) { + ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.lastOut); + pTask->msgCtx.lastOut = NULL; + } + taosMemoryFreeClear(pTask->taskCtx); + break; + } case CTG_TASK_GET_TB_HASH: { SCtgTbHashCtx* taskCtx = (SCtgTbHashCtx*)pTask->taskCtx; taosMemoryFreeClear(taskCtx->pName); @@ -679,6 +719,23 @@ int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* targ return TSDB_CODE_SUCCESS; } +int32_t ctgAddMsgCtx(SArray* pCtxs, int32_t reqType, void* out, char* target) { + SCtgMsgCtx ctx = {0}; + + ctx.reqType = reqType; + ctx.out = out; + if (target) { + ctx.target = strdup(target); + if (NULL == ctx.target) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + } + + taosArrayPush(pCtxs, &ctx); + + return TSDB_CODE_SUCCESS; +} + int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) { switch (hashMethod) { diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index e8ffd98153a2e7ac32e4e7500cb0e1a5d14da0e4..6bf2529551be78b15886d9e69c7de72316b53eb2 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -387,10 +387,13 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int char * sql = strndup(msg->msg, msg->sqlLen); QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql); - QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql)); + QW_ERR_JRET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql)); QW_SCH_TASK_DLOG("processQuery end, node:%p", node); - return TSDB_CODE_SUCCESS; +_return: + + taosMemoryFree(sql); + return code; } int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {