From 4cf20418cdd0d446517184077521c49ed349f562 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 26 Jul 2022 18:00:50 +0800 Subject: [PATCH] feat: support batch fetch in catalog --- include/common/tmsg.h | 22 ++ include/common/tmsgdef.h | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/vnode/src/inc/vnd.h | 4 +- source/dnode/vnode/src/vnd/vnodeQuery.c | 135 +++++++- source/dnode/vnode/src/vnd/vnodeSvr.c | 8 +- source/libs/catalog/inc/catalogInt.h | 18 + source/libs/catalog/inc/ctgRemote.h | 3 +- source/libs/catalog/src/ctgAsync.c | 21 +- source/libs/catalog/src/ctgRemote.c | 363 ++++++++++++++++++-- source/libs/catalog/src/ctgUtil.c | 34 +- source/libs/qcom/src/querymsg.c | 1 + 12 files changed, 559 insertions(+), 52 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0d0eb841bc..9d23557ee7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3045,6 +3045,28 @@ typedef struct SDeleteRes { int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes); +typedef struct { + int32_t msgType; + int32_t msgLen; + void* msg; +} SBatchMsg; + +typedef struct { + int32_t reqType; + int32_t msgLen; + int32_t rspCode; + void* msg; +} SBatchRsp; + +static FORCE_INLINE void tFreeSBatchRsp(void *p) { + if (NULL == p) { + return; + } + + SBatchRsp* pRsp = (SBatchRsp*); + taosMemoryFree(pRsp->msg); +} + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 56e0935ce1..2d5a703b62 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -180,6 +180,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_TABLE_META, "vnode-table-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TABLES_META, "vnode-tables-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TABLE_CFG, "vnode-table-cfg", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_BATCH_META, "vnode-batch-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0471e2b850..9ef2350b6b 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -336,6 +336,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 984b34814d..44281ea38e 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -78,8 +78,8 @@ void vnodeBufPoolReset(SVBufPool* pPool); // vnodeQuery.c int32_t vnodeQueryOpen(SVnode* pVnode); void vnodeQueryClose(SVnode* pVnode); -int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); -int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg); +int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct); +int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct); // vnodeCommit.c int32_t vnodeBegin(SVnode* pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 1c3e2f0514..bf846dba13 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -21,7 +21,7 @@ int vnodeQueryOpen(SVnode *pVnode) { void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } -int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { +int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { STableInfoReq infoReq = {0}; STableMetaRsp metaRsp = {0}; SMetaReader mer1 = {0}; @@ -99,7 +99,12 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { goto _exit; } - pRsp = rpcMallocCont(rspLen); + if (direct) { + pRsp = rpcMallocCont(rspLen); + } else { + pRsp = taosMemoryCalloc(1, rspLen); + } + if (pRsp == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; @@ -117,15 +122,17 @@ _exit: qError("get table %s meta failed cause of %s", infoReq.tbName, tstrerror(code)); } - tmsgSendRsp(&rpcMsg); - + if (direct) { + tmsgSendRsp(&rpcMsg); + } + taosMemoryFree(metaRsp.pSchemas); metaReaderClear(&mer2); metaReaderClear(&mer1); return TSDB_CODE_SUCCESS; } -int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg) { +int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { STableCfgReq cfgReq = {0}; STableCfgRsp cfgRsp = {0}; SMetaReader mer1 = {0}; @@ -209,7 +216,12 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg) { goto _exit; } - pRsp = rpcMallocCont(rspLen); + if (direct) { + pRsp = rpcMallocCont(rspLen); + } else { + pRsp = taosMemoryCalloc(1, rspLen); + } + if (pRsp == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; @@ -227,14 +239,121 @@ _exit: qError("get table %s cfg failed cause of %s", cfgReq.tbName, tstrerror(code)); } - tmsgSendRsp(&rpcMsg); - + if (direct) { + tmsgSendRsp(&rpcMsg); + } + tFreeSTableCfgRsp(&cfgRsp); metaReaderClear(&mer2); metaReaderClear(&mer1); return TSDB_CODE_SUCCESS; } +int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = 0; + int32_t offset = 0; + int32_t rspSize = 0; + int32_t msgNum = ntohl(pMsg->pCont); + offset += sizeof(msgNum); + SBatchMsg req = {0}; + SBatchRsp rsp = {0}; + SRpcMsg reqMsg = *pMsg; + SRpcMsg rspMsg = {0}; + void* pRsp = NULL; + + SArray* batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp)); + if (NULL == batchRsp) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + for (int32_t i = 0; i < msgNum; ++i) { + req.msgType = ntohl((char*)pMsg->pCont + offset); + offset += req.msgType; + + req.msgLen = ntohl((char*)pMsg->pCont + offset); + offset += req.msgLen; + + req.msg = (char*)pMsg->pCont + offset; + offset += req.msgLen; + + reqMsg.msgType = req.msgType; + reqMsg.pCont = req.msg; + reqMsg.contLen = req.msgLen; + + switch (req.msgType) { + case TDMT_VND_TABLE_META: + vnodeGetTableMeta(pVnode, &reqMsg, false); + break; + case TDMT_VND_TABLE_CFG: + vnodeGetTableCfg(pVnode, &reqMsg, false); + break; + default: + qError("invalid req msgType %d", req.msgType); + reqMsg.code = TSDB_CODE_INVALID_MSG; + reqMsg.pCont = NULL; + reqMsg.contLen = 0; + break; + } + + rsp.reqType = reqMsg.msgType; + rsp.msgLen = reqMsg.contLen; + rsp.rspCode = reqMsg.code; + rsp.msg = reqMsg.pCont; + + taosArrayPush(batchRsp, &rsp); + + rspSize += sizeof(rsp) + rsp.msgLen - POINTER_BYTES; + } + + rspSize += sizeof(int32_t); + offset = 0; + + pRsp = rpcMallocCont(rspSize); + if (pRsp == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + *(int32_t*)((char*)pRsp + offset) = htonl(msgNum); + offset += sizeof(msgNum); + for (int32_t i = 0; i < msgNum; ++i) { + SBatchRsp *p = taosArrayGet(batchRsp, i); + + *(int32_t*)((char*)pRsp + offset) = htonl(p->reqType); + offset += sizeof(p->reqType); + *(int32_t*)((char*)pRsp + offset) = htonl(p->msgLen); + offset += sizeof(p->msgLen); + *(int32_t*)((char*)pRsp + offset) = htonl(p->rspCode); + offset += sizeof(p->rspCode); + memcpy((char*)pRsp + offset, p->msg, p->msgLen); + offset += p->msgLen; + + taosMemoryFreeClear(p->msg); + } + + taosArrayDestroy(batchRsp); + batchRsp = NULL; + +_exit: + + rspMsg.info = pMsg->info; + rspMsg.pCont = pRsp; + rspMsg.contLen = rspSize; + rspMsg.code = code; + rspMsg.msgType = pMsg->msgType; + + if (code) { + qError("get batch meta failed cause of %s", tstrerror(code)); + } + + taosArrayDestroyEx(batchRsp, tFreeSBatchRsp); + + tmsgSendRsp(&rspMsg); + + return TSDB_CODE_SUCCESS; +} + int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->vgId = TD_VID(pVnode); pLoad->syncState = syncGetMyRole(pVnode->sync); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 9d06fbffdd..830ab6e37c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -296,7 +296,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("message in fetch queue is processing"); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || - pMsg->msgType == TDMT_VND_TABLE_CFG) && + pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType = TDMT_VND_BATCH_META) && !vnodeIsLeader(pVnode)) { vnodeRedirectRpcMsg(pVnode, pMsg); return 0; @@ -318,9 +318,11 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { case TDMT_SCH_QUERY_HEARTBEAT: return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0); case TDMT_VND_TABLE_META: - return vnodeGetTableMeta(pVnode, pMsg); + return vnodeGetTableMeta(pVnode, pMsg, true); case TDMT_VND_TABLE_CFG: - return vnodeGetTableCfg(pVnode, pMsg); + return vnodeGetTableCfg(pVnode, pMsg, true); + case TDMT_VND_BATCH_META: + return vnodeGetBatchMeta(pVnode, pMsg); case TDMT_VND_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index bf3bc1f0f4..4bd85792cd 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -31,6 +31,7 @@ extern "C" { #define CTG_DEFAULT_RENT_SECOND 10 #define CTG_DEFAULT_RENT_SLOT_SIZE 10 #define CTG_DEFAULT_MAX_RETRY_TIMES 3 +#define CTG_DEFAULT_BATCH_NUM 64 #define CTG_RENT_SLOT_SECOND 1.5 @@ -38,6 +39,8 @@ extern "C" { #define CTG_ERR_CODE_TABLE_NOT_EXIST TSDB_CODE_PAR_TABLE_NOT_EXIST +#define CTG_BATCH_FETCH 1 + enum { CTG_READ = 1, CTG_WRITE, @@ -200,8 +203,20 @@ typedef struct SCatalog { SCtgRentMgmt stbRent; } SCatalog; +typedef struct SCtgBatch { + int32_t batchId; + int32_t msgType; + int32_t msgSize; + SArray* pMsgs; + SRequestConnInfo *pConn; + char dbFName[TSDB_DB_FNAME_LEN]; + SArray* pTaskIds; +} SCtgBatch; + typedef struct SCtgJob { int64_t refId; + int32_t batchId; + SHashObj* pBatchs; SArray* pTasks; int32_t taskDone; SMetaData jobRes; @@ -258,6 +273,7 @@ typedef struct SCtgTask { SRWLatch lock; SArray* pParents; SCtgSubRes subRes; + SHashObj* pBatchs; } SCtgTask; typedef int32_t (*ctgInitTaskFp)(SCtgJob*, int32_t, void*); @@ -626,6 +642,8 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, int32_t ctgGetTbCfgCb(SCtgTask *pTask); void ctgFreeHandle(SCatalog* pCatalog); +void ctgFreeBatch(SCtgBatch *pBatch); +void ctgFreeBatchs(SHashObj *pBatchs); int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst); int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput); int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList); diff --git a/source/libs/catalog/inc/ctgRemote.h b/source/libs/catalog/inc/ctgRemote.h index cd88863c1b..72ab43e085 100644 --- a/source/libs/catalog/inc/ctgRemote.h +++ b/source/libs/catalog/inc/ctgRemote.h @@ -23,8 +23,9 @@ extern "C" { typedef struct SCtgTaskCallbackParam { uint64_t queryId; int64_t refId; - uint64_t taskId; + SArray* taskId; int32_t reqType; + int32_t batchId; } SCtgTaskCallbackParam; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 920acbac2e..65c6a0f76b 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -473,8 +473,15 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const pJob->tbCfgNum = tbCfgNum; pJob->svrVerNum = svrVerNum; - pJob->pTasks = taosArrayInit(taskNum, sizeof(SCtgTask)); +#if CTG_BATCH_FETCH + pJob->pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (NULL == pJob->pBatchs) { + ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } +#endif + pJob->pTasks = taosArrayInit(taskNum, sizeof(SCtgTask)); if (NULL == pJob->pTasks) { ctgError("taosArrayInit %d tasks failed", taskNum); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); @@ -560,7 +567,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const _return: - taosMemoryFreeClear(*job); + ctgFreeJob(*job); CTG_RET(code); } @@ -872,7 +879,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf * SVgroupInfo vgInfo = {0}; CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo)); - ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag); + ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag); ctx->vgId = vgInfo.vgId; CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask)); @@ -890,7 +897,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf * return TSDB_CODE_SUCCESS; } - ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(ctx->pName)); + ctgError("no tbmeta got, tbName:%s", tNameGetTableName(ctx->pName)); ctgRemoveTbMetaFromCache(pCtg, ctx->pName, false); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); @@ -1705,13 +1712,19 @@ int32_t ctgLaunchJob(SCtgJob *pJob) { qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask)); + pTask->status = CTG_TASK_LAUNCHED; + pTask->pBatchs = pJob->pBatchs; } if (taskNum <= 0) { qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode)); taosAsyncExec(ctgCallUserCb, pJob, NULL); +#if CTG_BATCH_FETCH + } else { + ctgLaunchBatchs(pJob->pCtg, pJob, pJob->pBatchs); +#endif } return TSDB_CODE_SUCCESS; diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index cc5dde9298..04ffbca629 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -21,6 +21,64 @@ #include "ctgRemote.h" #include "tref.h" +int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf *pMsg, int32_t rspCode) { + int32_t code = 0; + SArray* pTaskId = cbParam->taskId; + int32_t taskNum = taosArrayGetSize(pTaskId); + SDataBuf taskMsg = *pMsg; + int32_t offset = 0; + int32_t msgNum = (pMsg->pData && (pMsg->len > 0)) ? htonl(pMsg->pData) : 0; + ASSERT(taskNum == msgNum || 0 == msgNum); + + qDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1)); + + offset += sizeof(msgNum); + SBatchRsp rsp = {0}; + SHashObj* pBatchs = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (NULL == pBatchs) { + ctgError("taosHashInit %d batch failed", taskNum); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + for (int32_t i = 0; i < taskNum; ++i) { + int32_t taskId = taosArrayGet(pTaskId, i); + SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskId); + if (msgNum > 0) { + rsp.reqType = htonl(((char*)pMsg->pData) + offset); + offset += sizeof(rsp.reqType); + rsp.msgLen = htonl(((char*)pMsg->pData) + offset); + offset += sizeof(rsp.msgLen); + rsp.rspCode = htonl(((char*)pMsg->pData) + offset); + offset += sizeof(rsp.rspCode); + rsp.msg = ((char*)pMsg->pData) + offset; + offset += rsp.msgLen; + + taskMsg.msgType = rsp.reqType; + taskMsg.pData = rsp.msg; + taskMsg.len = rsp.msgLen; + } else { + rsp.reqType = -1; + taskMsg.msgType = -1; + taskMsg.pData = NULL; + taskMsg.len = 0; + } + + pTask->pBatchs = pBatchs; + + qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1)); + + (*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); + } + + CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs)); + +_return: + + ctgFreeBatchs(pBatchs); + CTG_RET(code); +} + + int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target) { int32_t code = 0; @@ -233,6 +291,11 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, break; } default: + if (TSDB_CODE_SUCCESS != rspCode) { + qError("Got error rsp, error:%s", tstrerror(rspCode)); + CTG_ERR_RET(rspCode); + } + qError("invalid req type %s", TMSG_INFO(reqType)); return TSDB_CODE_APP_ERROR; } @@ -254,12 +317,17 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { goto _return; } - SCtgTask *pTask = taosArrayGet(pJob->pTasks, cbParam->taskId); + if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) { + CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode)); + } else { + int32_t taskId = taosArrayGet(cbParam->taskId, 0); + SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskId); - qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1)); + qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1)); + + CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode)); + } - CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode)); - _return: taosMemoryFree(pMsg->pData); @@ -272,7 +340,7 @@ _return: } -int32_t ctgMakeMsgSendInfo(SCtgTask* pTask, int32_t msgType, SMsgSendInfo **pMsgSendInfo) { +int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int32_t msgType, SMsgSendInfo **pMsgSendInfo) { int32_t code = 0; SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (NULL == msgSendInfo) { @@ -287,9 +355,10 @@ int32_t ctgMakeMsgSendInfo(SCtgTask* pTask, int32_t msgType, SMsgSendInfo **pMsg } param->reqType = msgType; - param->queryId = pTask->pJob->queryId; - param->refId = pTask->pJob->refId; - param->taskId = pTask->taskId; + param->queryId = pJob->queryId; + param->refId = pJob->refId; + param->taskId = pTaskId; + param->batchId = batchId; msgSendInfo->param = param; msgSendInfo->paramFreeFp = taosMemoryFree; @@ -307,12 +376,13 @@ _return: CTG_RET(code); } -int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTask* pTask, int32_t msgType, void *msg, uint32_t msgSize) { +int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob* pJob, SArray* pTaskId, + int32_t batchId, char* dbFName, int32_t vgId, int32_t msgType, void *msg, uint32_t msgSize) { int32_t code = 0; SMsgSendInfo *pMsgSendInfo = NULL; - CTG_ERR_JRET(ctgMakeMsgSendInfo(pTask, msgType, &pMsgSendInfo)); + CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, msgType, &pMsgSendInfo)); - ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, pTask); + ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId); pMsgSendInfo->requestId = pConn->requestId; pMsgSendInfo->requestObjRefId = pConn->requestObjRefId; @@ -328,19 +398,163 @@ int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTask* pTask CTG_ERR_JRET(code); } - ctgDebug("ctg req msg sent, reqId:0x%" PRIx64 ", msg type:%d, %s", pTask->pJob->queryId, msgType, TMSG_INFO(msgType)); + ctgDebug("ctg req msg sent, reqId:0x%" PRIx64 ", msg type:%d, %s", pJob->queryId, msgType, TMSG_INFO(msgType)); return TSDB_CODE_SUCCESS; _return: if (pMsgSendInfo) { - taosMemoryFreeClear(pMsgSendInfo->param); - taosMemoryFreeClear(pMsgSendInfo); + destroySendMsgInfo(pMsgSendInfo); + } + + CTG_RET(code); +} + +int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgTask* pTask, int32_t msgType, void *msg, uint32_t msgSize) { + int32_t code = 0; + SHashObj* pBatchs = pTask->pBatchs; + SCtgJob* pJob = pTask->pJob; + SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId)); + int32_t taskNum = taosArrayGetSize(pTask->pJob->pTasks); + SCtgBatch newBatch = {0}; + SBatchMsg req = {0}; + + if (NULL == pBatch) { + newBatch.pMsgs = taosArrayInit(taskNum, sizeof(SBatchMsg)); + newBatch.pTaskIds = taosArrayInit(taskNum, sizeof(int32_t)); + if (NULL == newBatch.pMsgs || NULL == newBatch.pTaskIds) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + newBatch.pConn = pConn; + + req.msgType = msgType; + req.msgLen = msgSize; + req.msg = msg; + if (NULL == taosArrayPush(newBatch.pMsgs, &req)) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + newBatch.msgSize = sizeof(req) + msgSize - POINTER_BYTES; + + if (vgId > 0) { + if (TDMT_VND_TABLE_CFG == msgType) { + SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; + tNameGetFullDbName(ctx->pName, newBatch.dbFName); + } else if (TDMT_VND_TABLE_META == msgType) { + SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; + tNameGetFullDbName(ctx->pName, newBatch.dbFName); + } else { + ctgError("invalid vnode msgType %d", msgType); + CTG_ERR_JRET(TSDB_CODE_APP_ERROR); + } + } + + newBatch.msgType = (vgId > 0) ? TDMT_VND_BATCH_META : TDMT_MND_BATCH_META; + newBatch.batchId = atomic_add_fetch_32(&pJob->batchId, 1); + + if (0 != taosHashPut(pBatchs, &vgId, sizeof(vgId), &newBatch, sizeof(newBatch))) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch->batchId, vgId); + + return TSDB_CODE_SUCCESS; + } + + req.msgType = msgType; + req.msgLen = msgSize; + req.msg = msg; + if (NULL == taosArrayPush(pBatch->pMsgs, &req)) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + pBatch->msgSize += sizeof(req) + msgSize - POINTER_BYTES; + + if (vgId > 0) { + if (TDMT_VND_TABLE_CFG == msgType) { + SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; + tNameGetFullDbName(ctx->pName, newBatch.dbFName); + } else if (TDMT_VND_TABLE_META == msgType) { + SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; + tNameGetFullDbName(ctx->pName, newBatch.dbFName); + } else { + ctgError("invalid vnode msgType %d", msgType); + CTG_ERR_JRET(TSDB_CODE_APP_ERROR); + } + } + + ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, vgId); + + return TSDB_CODE_SUCCESS; + +_return: + + ctgFreeBatch(&newBatch); + taosMemoryFree(msg); + + return code; +} + +int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, void** msg) { + *msg = taosMemoryMalloc(pBatch->msgSize); + if (NULL == (*msg)) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + int32_t offset = 0; + int32_t num = taosArrayGetSize(pBatch->pMsgs); + *(int32_t*)((char*)(*msg) + offset) = htonl(num); + offset += sizeof(num); + + for (int32_t i = 0; i < num; ++i) { + SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i); + *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType); + offset += sizeof(pReq->msgType); + *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgLen); + offset += sizeof(pReq->msgLen); + memcpy((char*)(*msg) + offset, pReq->msg, pReq->msgLen); + offset += pReq->msgLen; } + ASSERT(pBatch->msgSize == offset); + + return TSDB_CODE_SUCCESS; +} + +int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs) { + int32_t code = 0; + void* msg = NULL; + void* p = taosHashIterate(pBatchs, NULL); + while (NULL != p) { + size_t len = 0; + int32_t* vgId = taosHashGetKey(p, &len); + SCtgBatch* pBatch = (SCtgBatch*)p; + + CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, &msg)); + CTG_ERR_JRET(ctgAsyncSendMsg(pCtg, pBatch->pConn, pJob, pBatch->pTaskIds, pBatch->batchId, + pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize)); + + p = taosHashIterate(pBatchs, p); + } + + return TSDB_CODE_SUCCESS; + +_return: + + if (p) { + taosHashCancelIterate(pBatchs, p); + } + taosMemoryFree(msg); + CTG_RET(code); } + int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask) { char *msg = NULL; int32_t msgLen = 0; @@ -361,7 +575,14 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, NULL)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -396,7 +617,14 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray if (pTask) { CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -436,8 +664,14 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, input->db)); + + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -476,8 +710,14 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)dbFName)); + + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -517,7 +757,13 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)indexName)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -560,7 +806,13 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -600,7 +852,13 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)funcName)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -640,7 +898,13 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)user)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -685,7 +949,13 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -744,7 +1014,21 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa .requestId = pConn->requestId, .requestObjRefId = pConn->requestObjRefId, .mgmtEps = vgroupInfo->epSet}; - CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask, reqType, msg, msgLen)); + +#if CTG_BATCH_FETCH + CTG_RET(ctgAddVnodeBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen)); +#else + SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(ctx->pName, dbFName); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->vgId, reqType, msg, msgLen)); +#endif } SRpcMsg rpcMsg = { @@ -791,7 +1075,20 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S .requestId = pConn->requestId, .requestObjRefId = pConn->requestObjRefId, .mgmtEps = vgroupInfo->epSet}; - CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask, reqType, msg, msgLen)); +#if CTG_BATCH_FETCH + CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen)); +#else + SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(ctx->pName, dbFName); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->pVgInfo->vgId, reqType, msg, msgLen)); +#endif } SRpcMsg rpcMsg = { @@ -833,7 +1130,13 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S if (pTask) { CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -869,7 +1172,13 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou if (pTask) { CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 1f0f074a0f..8d33e5596f 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -19,6 +19,29 @@ #include "catalogInt.h" #include "systable.h" + +void ctgFreeBatch(SCtgBatch *pBatch) { + if (NULL == pBatch) { + return; + } + + taosArrayDestroy(pBatch->pMsgs); + taosArrayDestroy(pBatch->pTaskIds); +} + +void ctgFreeBatchs(SHashObj *pBatchs) { + void* p = taosHashIterate(pBatchs, NULL); + while (NULL != p) { + SCtgBatch* pBatch = (SCtgBatch*)p; + + ctgFreeBatch(pBatch); + + p = taosHashIterate(pBatchs, p); + } + + taosHashCleanup(pBatchs); +} + char *ctgTaskTypeStr(CTG_TASK_TYPE type) { switch (type) { case CTG_TASK_GET_QNODE: @@ -612,6 +635,7 @@ void ctgFreeJob(void* job) { uint64_t qid = pJob->queryId; ctgFreeTasks(pJob->pTasks); + ctgFreeBatchs(pJob->pBatchs); ctgFreeSMetaData(&pJob->jobRes); @@ -867,14 +891,10 @@ int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes) { } -int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask) { - if (msgType == TDMT_VND_TABLE_META) { - SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; - char dbFName[TSDB_DB_FNAME_LEN]; - tNameGetFullDbName(ctx->pName, dbFName); - +int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId) { + if (msgType == TDMT_VND_TABLE_META || msgType == TDMT_VND_TABLE_CFG || msgType == TDMT_VND_BATCH_META) { pMsgSendInfo->target.type = TARGET_TYPE_VNODE; - pMsgSendInfo->target.vgId = ctx->vgId; + pMsgSendInfo->target.vgId = vgId; pMsgSendInfo->target.dbFName = strdup(dbFName); } else { pMsgSendInfo->target.type = TARGET_TYPE_MNODE; diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index ed8786170d..78ecde6003 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -636,6 +636,7 @@ void initQueryModuleMsgHandle() { queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryProcessGetTbIndexRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryProcessGetTbCfgRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryProcessGetTbCfgRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_BATCH_META)] = queryProcessGetBatchMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryProcessGetSerVerRsp; } -- GitLab