提交 648a7a12 编写于 作者: D dapan1121

feat: support mnd meta batch fetching

上级 602d2197
...@@ -184,6 +184,7 @@ SArray *mmGetMsgHandles() { ...@@ -184,6 +184,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_BATCH_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_CFG, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_CFG, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -63,6 +63,104 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { ...@@ -63,6 +63,104 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
return code; return code;
} }
int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
int32_t code = 0;
int32_t offset = 0;
int32_t rspSize = 0;
SBatchReq *batchReq = (SBatchReq*)pMsg->pCont;
int32_t msgNum = ntohl(batchReq->msgNum);
offset += sizeof(SBatchReq);
SBatchMsg req = {0};
SBatchRsp rsp = {0};
SRpcMsg reqMsg = *pMsg;
SRpcMsg rspMsg = {0};
void* pRsp = NULL;
SMnode *pMnode = pMsg->info.node;
SArray* batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
if (NULL == batchRsp) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
for (int32_t i = 0; i < msgNum; ++i) {
req.msgType = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
offset += sizeof(req.msgType);
req.msgLen = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
offset += sizeof(req.msgLen);
req.msg = (char*)pMsg->pCont + offset;
offset += req.msgLen;
reqMsg.msgType = req.msgType;
reqMsg.pCont = req.msg;
reqMsg.contLen = req.msgLen;
reqMsg.info.rsp = NULL;
reqMsg.info.rspLen = 0;
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(req.msgType)];
if (fp == NULL) {
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
return -1;
}
code = (*fp)(&reqMsg);
rsp.reqType = reqMsg.msgType;
rsp.msgLen = reqMsg.info.rspLen;
rsp.rspCode = code;
rsp.msg = reqMsg.info.rsp;
taosArrayPush(batchRsp, &rsp);
rspSize += sizeof(rsp) + rsp.msgLen - POINTER_BYTES;
}
rspSize += sizeof(int32_t);
offset = 0;
pRsp = rpcMallocCont(rspSize);
if (pRsp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*(int32_t*)((char*)pRsp + offset) = htonl(msgNum);
offset += sizeof(msgNum);
for (int32_t i = 0; i < msgNum; ++i) {
SBatchRsp *p = taosArrayGet(batchRsp, i);
*(int32_t*)((char*)pRsp + offset) = htonl(p->reqType);
offset += sizeof(p->reqType);
*(int32_t*)((char*)pRsp + offset) = htonl(p->msgLen);
offset += sizeof(p->msgLen);
*(int32_t*)((char*)pRsp + offset) = htonl(p->rspCode);
offset += sizeof(p->rspCode);
memcpy((char*)pRsp + offset, p->msg, p->msgLen);
offset += p->msgLen;
rpcFreeCont(p->msg);
}
taosArrayDestroy(batchRsp);
batchRsp = NULL;
_exit:
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = rspSize;
if (code) {
mError("mnd get batch meta failed cause of %s", tstrerror(code));
}
taosArrayDestroyEx(batchRsp, tFreeSBatchRsp);
return code;
}
int32_t mndInitQuery(SMnode *pMnode) { int32_t mndInitQuery(SMnode *pMnode) {
if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) { if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) {
mError("failed to init qworker in mnode since %s", terrstr()); mError("failed to init qworker in mnode since %s", terrstr());
...@@ -76,6 +174,7 @@ int32_t mndInitQuery(SMnode *pMnode) { ...@@ -76,6 +174,7 @@ int32_t mndInitQuery(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_FETCH, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_FETCH, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_MND_BATCH_META, mndProcessBatchMetaMsg);
return 0; return 0;
} }
......
...@@ -349,14 +349,14 @@ _exit: ...@@ -349,14 +349,14 @@ _exit:
rspMsg.msgType = pMsg->msgType; rspMsg.msgType = pMsg->msgType;
if (code) { if (code) {
qError("get batch meta failed cause of %s", tstrerror(code)); qError("vnd get batch meta failed cause of %s", tstrerror(code));
} }
taosArrayDestroyEx(batchRsp, tFreeSBatchRsp); taosArrayDestroyEx(batchRsp, tFreeSBatchRsp);
tmsgSendRsp(&rspMsg); tmsgSendRsp(&rspMsg);
return TSDB_CODE_SUCCESS; return code;
} }
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
......
...@@ -251,6 +251,16 @@ typedef struct SCtgMsgCtx { ...@@ -251,6 +251,16 @@ typedef struct SCtgMsgCtx {
char* target; char* target;
} SCtgMsgCtx; } SCtgMsgCtx;
typedef struct SCtgTaskCallbackParam {
uint64_t queryId;
int64_t refId;
SArray* taskId;
int32_t reqType;
int32_t batchId;
} SCtgTaskCallbackParam;
typedef struct SCtgTask SCtgTask; typedef struct SCtgTask SCtgTask;
typedef int32_t (*ctgSubTaskCbFp)(SCtgTask*); typedef int32_t (*ctgSubTaskCbFp)(SCtgTask*);
...@@ -643,6 +653,7 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, ...@@ -643,6 +653,7 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp,
int32_t ctgGetTbCfgCb(SCtgTask *pTask); int32_t ctgGetTbCfgCb(SCtgTask *pTask);
void ctgFreeHandle(SCatalog* pCatalog); void ctgFreeHandle(SCatalog* pCatalog);
void ctgFreeMsgSendParam(void* param);
void ctgFreeBatch(SCtgBatch *pBatch); void ctgFreeBatch(SCtgBatch *pBatch);
void ctgFreeBatchs(SHashObj *pBatchs); void ctgFreeBatchs(SHashObj *pBatchs);
int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst); int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst);
......
...@@ -20,13 +20,6 @@ ...@@ -20,13 +20,6 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SCtgTaskCallbackParam {
uint64_t queryId;
int64_t refId;
SArray* taskId;
int32_t reqType;
int32_t batchId;
} SCtgTaskCallbackParam;
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -783,7 +783,8 @@ int32_t ctgCallSubCb(SCtgTask *pTask) { ...@@ -783,7 +783,8 @@ int32_t ctgCallSubCb(SCtgTask *pTask) {
pParent->subRes.code = code; pParent->subRes.code = code;
} }
} }
pParent->pBatchs = pTask->pBatchs;
CTG_ERR_JRET(pParent->subRes.fp(pParent)); CTG_ERR_JRET(pParent->subRes.fp(pParent));
} }
...@@ -1660,6 +1661,7 @@ int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) { ...@@ -1660,6 +1661,7 @@ int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) {
if (CTG_TASK_DONE == pSub->status) { if (CTG_TASK_DONE == pSub->status) {
pTask->subRes.code = pSub->code; pTask->subRes.code = pSub->code;
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res)); CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res));
pTask->pBatchs = pSub->pBatchs;
CTG_ERR_JRET(pTask->subRes.fp(pTask)); CTG_ERR_JRET(pTask->subRes.fp(pTask));
} else { } else {
if (NULL == pSub->pParents) { if (NULL == pSub->pParents) {
...@@ -1697,6 +1699,7 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, ...@@ -1697,6 +1699,7 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp,
CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask)); CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask));
if (newTask) { if (newTask) {
pSub->pBatchs = pTask->pBatchs;
CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub)); CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub));
pSub->status = CTG_TASK_LAUNCHED; pSub->status = CTG_TASK_LAUNCHED;
} }
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
#include "tname.h" #include "tname.h"
#include "catalogInt.h" #include "catalogInt.h"
#include "systable.h" #include "systable.h"
#include "ctgRemote.h"
#include "tref.h" #include "tref.h"
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf *pMsg, int32_t rspCode) {
...@@ -361,7 +360,7 @@ int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int3 ...@@ -361,7 +360,7 @@ int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int3
SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == msgSendInfo) { if (NULL == msgSendInfo) {
qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
CTG_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCtgTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam)); SCtgTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam));
...@@ -377,7 +376,7 @@ int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int3 ...@@ -377,7 +376,7 @@ int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int3
param->batchId = batchId; param->batchId = batchId;
msgSendInfo->param = param; msgSendInfo->param = param;
msgSendInfo->paramFreeFp = taosMemoryFree; msgSendInfo->paramFreeFp = ctgFreeMsgSendParam;
msgSendInfo->fp = ctgHandleMsgCallback; msgSendInfo->fp = ctgHandleMsgCallback;
*pMsgSendInfo = msgSendInfo; *pMsgSendInfo = msgSendInfo;
...@@ -386,8 +385,8 @@ int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int3 ...@@ -386,8 +385,8 @@ int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int3
_return: _return:
taosMemoryFree(param); taosArrayDestroy(pTaskId);
taosMemoryFree(msgSendInfo); destroySendMsgInfo(msgSendInfo);
CTG_RET(code); CTG_RET(code);
} }
...@@ -543,6 +542,8 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { ...@@ -543,6 +542,8 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
ASSERT(pBatch->msgSize == offset); ASSERT(pBatch->msgSize == offset);
qDebug("batch req %d to vg %d msg built with %d meta reqs", pBatch->batchId, vgId, num);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -558,9 +559,11 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs) { ...@@ -558,9 +559,11 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs) {
ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId); ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId);
CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg)); CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg));
CTG_ERR_JRET(ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId,
pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize)); pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize);
pBatch->pTaskIds = NULL;
CTG_ERR_JRET(code);
p = taosHashIterate(pBatchs, p); p = taosHashIterate(pBatchs, p);
} }
...@@ -598,6 +601,9 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray ...@@ -598,6 +601,9 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
} }
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, NULL)); CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, NULL));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
...@@ -605,6 +611,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray ...@@ -605,6 +611,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -640,6 +647,9 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray ...@@ -640,6 +647,9 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
if (pTask) { if (pTask) {
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL)); CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
...@@ -647,6 +657,7 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray ...@@ -647,6 +657,7 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -687,6 +698,9 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU ...@@ -687,6 +698,9 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU
} }
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, input->db)); CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, input->db));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
...@@ -694,6 +708,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU ...@@ -694,6 +708,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -733,6 +748,9 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char ...@@ -733,6 +748,9 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char
} }
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)dbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)dbFName));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
...@@ -740,6 +758,7 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char ...@@ -740,6 +758,7 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -778,7 +797,10 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ...@@ -778,7 +797,10 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)indexName)); CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)indexName));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
...@@ -786,6 +808,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ...@@ -786,6 +808,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -827,7 +850,10 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n ...@@ -827,7 +850,10 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n
} }
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
...@@ -835,6 +861,7 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n ...@@ -835,6 +861,7 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -873,7 +900,10 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch ...@@ -873,7 +900,10 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)funcName)); CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)funcName));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
...@@ -881,6 +911,7 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch ...@@ -881,6 +911,7 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -919,7 +950,10 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ...@@ -919,7 +950,10 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)user)); CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)user));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
...@@ -927,6 +961,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ...@@ -927,6 +961,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -970,7 +1005,9 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char ...@@ -970,7 +1005,9 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
...@@ -978,6 +1015,7 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char ...@@ -978,6 +1015,7 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -1151,7 +1189,9 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S ...@@ -1151,7 +1189,9 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
if (pTask) { if (pTask) {
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
...@@ -1159,6 +1199,7 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S ...@@ -1159,6 +1199,7 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -1193,7 +1234,10 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou ...@@ -1193,7 +1234,10 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou
if (pTask) { if (pTask) {
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL)); CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
...@@ -1201,6 +1245,7 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou ...@@ -1201,6 +1245,7 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
......
...@@ -19,6 +19,16 @@ ...@@ -19,6 +19,16 @@
#include "catalogInt.h" #include "catalogInt.h"
#include "systable.h" #include "systable.h"
void ctgFreeMsgSendParam(void* param) {
if (NULL == param) {
return;
}
SCtgTaskCallbackParam* pParam = (SCtgTaskCallbackParam*)param;
taosArrayDestroy(pParam->taskId);
taosMemoryFree(param);
}
void ctgFreeBatch(SCtgBatch *pBatch) { void ctgFreeBatch(SCtgBatch *pBatch) {
if (NULL == pBatch) { if (NULL == pBatch) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册