From 7729ce2eb313dd00a978eba3935978b7fa321984 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 16 May 2022 16:13:51 +0800 Subject: [PATCH] refactor: adjust SRpcMsg --- include/common/tmsgcb.h | 2 +- source/common/src/tmsgcb.c | 37 +++---------- source/dnode/mgmt/mgmt_bnode/src/bmWorker.c | 21 +++---- source/dnode/mgmt/mgmt_qnode/src/qmWorker.c | 4 +- source/dnode/mgmt/mgmt_snode/src/smWorker.c | 4 +- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 12 +--- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 10 +--- source/dnode/mnode/impl/inc/mndDef.h | 48 ++++++++-------- source/dnode/mnode/impl/src/mndTrans.c | 18 ++---- source/dnode/vnode/src/vnd/vnodeQuery.c | 4 +- source/libs/qworker/inc/qworkerInt.h | 28 ++++------ source/libs/qworker/inc/qworkerMsg.h | 20 +++---- source/libs/qworker/src/qworker.c | 14 ++--- source/libs/qworker/src/qworkerMsg.c | 55 +++++++------------ 14 files changed, 104 insertions(+), 173 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index a484c2acc9..d507372d56 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -41,7 +41,7 @@ typedef int32_t (*PutToQueueFp)(void *pMgmt, SRpcMsg* pReq); typedef int32_t (*GetQueueSizeFp)(void *pMgmt, int32_t vgId, EQueueType qtype); typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); -typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp); +typedef void (*SendRspFp)(const SRpcMsg* pRsp); typedef void (*SendMnodeRecvFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq, SRpcMsg* pRsp); typedef void (*SendRedirectRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp, const SEpSet* pNewEpSet); typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg); diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index 78b70c9288..f2655fb03c 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -19,49 +19,30 @@ static SMsgCb tsDefaultMsgCb; -void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { - // if (tsDefaultMsgCb.pWrapper == NULL) { - tsDefaultMsgCb = *pMsgCb; - //} -} +void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; } int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { + // cannot be empty, but not checked for faster detect PutToQueueFp fp = pMsgCb->queueFps[qtype]; - if (fp != NULL) { - return (*fp)(pMsgCb->pMgmt, pReq); - } else { - terrno = TSDB_CODE_INVALID_PTR; - return -1; - } + return (*fp)(pMsgCb->pMgmt, pReq); } int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { + // cannot be empty, but not checked for faster detect GetQueueSizeFp fp = pMsgCb->qsizeFp; - if (fp != NULL) { - return (*fp)(pMsgCb->pMgmt, vgId, qtype); - } else { - terrno = TSDB_CODE_INVALID_PTR; - return -1; - } + return (*fp)(pMsgCb->pMgmt, vgId, qtype); } int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) { + // cannot be empty, but not checked for faster detect SendReqFp fp = pMsgCb->sendReqFp; - if (fp != NULL) { - return (*fp)(pMsgCb->pWrapper, epSet, pReq); - } else { - terrno = TSDB_CODE_INVALID_PTR; - return -1; - } + return (*fp)(pMsgCb->pWrapper, epSet, pReq); } void tmsgSendRsp(SRpcMsg* pRsp) { + // cannot be empty, but not checked for faster detect SendRspFp fp = tsDefaultMsgCb.sendRspFp; - if (fp != NULL) { - return (*fp)(tsDefaultMsgCb.pWrapper, pRsp); - } else { - terrno = TSDB_CODE_INVALID_PTR; - } + return (*fp)(pRsp); } void tmsgSendRedirectRsp(SRpcMsg* pRsp, const SEpSet* pNewEpSet) { diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c b/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c index b8400e1197..d97949d3f7 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c @@ -17,13 +17,8 @@ #include "bmInt.h" static void bmSendErrorRsp(SRpcMsg *pMsg, int32_t code) { - SRpcMsg rpcRsp = { - .info.handle = pMsg->info.handle, - .info.ahandle = pMsg->info.ahandle, - .code = code, - .info.refId = pMsg->info.refId, - }; - tmsgSendRsp(&rpcRsp); + SRpcMsg rsp = {.code = code, .info = pMsg->info}; + tmsgSendRsp(&rsp); dTrace("msg:%p, is freed", pMsg); rpcFreeCont(pMsg->pCont); @@ -41,12 +36,12 @@ static void bmSendErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) { } static inline void bmSendRsp(SRpcMsg *pMsg, int32_t code) { - SRpcMsg rsp = {.info.handle = pMsg->info.handle, - .info.ahandle = pMsg->info.ahandle, - .info.refId = pMsg->info.refId, - .code = code, - .pCont = pMsg->info.rsp, - .contLen = pMsg->info.rspLen,}; + SRpcMsg rsp = { + .code = code, + .info = pMsg->info, + .pCont = pMsg->info.rsp, + .contLen = pMsg->info.rspLen, + }; tmsgSendRsp(&rsp); } diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index f0f7a07f44..d5c82a043c 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -18,10 +18,8 @@ static inline void qmSendRsp(SRpcMsg *pMsg, int32_t code) { SRpcMsg rsp = { - .info.handle = pMsg->info.handle, - .info.ahandle = pMsg->info.ahandle, - .info.refId = pMsg->info.refId, .code = code, + .info = pMsg->info, .pCont = pMsg->info.rsp, .contLen = pMsg->info.rspLen, }; diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 1204e6884c..10bf6ff9bf 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -18,10 +18,8 @@ static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) { SRpcMsg rsp = { - .info.handle = pMsg->info.handle, - .info.ahandle = pMsg->info.ahandle, - .info.refId = pMsg->info.refId, .code = code, + .info = pMsg->info, .pCont = pMsg->info.rsp, .contLen = pMsg->info.rspLen, }; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 813c0ed0fb..077b691462 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -21,10 +21,8 @@ static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) { SRpcMsg rsp = { - .info.handle = pMsg->info.handle, - .info.ahandle = pMsg->info.ahandle, - .info.refId = pMsg->info.refId, .code = code, + .info = pMsg->info, .pCont = pMsg->info.rsp, .contLen = pMsg->info.rspLen, }; @@ -124,9 +122,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); pRpc = pMsg; - rsp.info.ahandle = pRpc->info.ahandle; - rsp.info.handle = pRpc->info.handle; - rsp.info.refId = pRpc->info.refId; + rsp.info = pRpc->info; rsp.pCont = NULL; rsp.contLen = 0; @@ -193,9 +189,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO // if leader, send response if (pMsg->info.handle != NULL && pMsg->info.ahandle != NULL) { - rsp.info.ahandle = pMsg->info.ahandle; - rsp.info.handle = pMsg->info.handle; - rsp.info.refId = pMsg->info.refId; + rsp.info = pMsg->info; tmsgSendRsp(&rsp); } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 6afe4c87e3..29dbed36c5 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -172,12 +172,7 @@ _OVER: code = TSDB_CODE_NODE_REDIRECT; } } - SRpcMsg rspMsg = { - .info.handle = pRpc->info.handle, - .code = code, - .info.ahandle = pRpc->info.ahandle, - .info.refId = pRpc->info.refId, - }; + SRpcMsg rspMsg = {.code = code, .info = pRpc->info}; tmsgSendRsp(&rspMsg); } @@ -282,7 +277,8 @@ static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SR return 0; } -static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { +static inline void dmSendRsp(const SRpcMsg *pRsp) { + SMgmtWrapper *pWrapper = pRsp->info.wrapper; if (InChildProc(pWrapper->proc.ptype)) { dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP); } else { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 5eb6866387..e360f52993 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -144,31 +144,29 @@ typedef enum { } ECsmUpdateType; typedef struct { - int32_t id; - ETrnStage stage; - ETrnPolicy policy; - ETrnType type; - int32_t code; - int32_t failedTimes; - void* rpcHandle; - void* rpcAHandle; - int64_t rpcRefId; - void* rpcRsp; - int32_t rpcRspLen; - SArray* redoLogs; - SArray* undoLogs; - SArray* commitLogs; - SArray* redoActions; - SArray* undoActions; - int64_t createdTime; - int64_t lastExecTime; - int64_t dbUid; - char dbname[TSDB_DB_FNAME_LEN]; - char lastError[TSDB_TRANS_ERROR_LEN]; - int32_t startFunc; - int32_t stopFunc; - int32_t paramLen; - void* param; + int32_t id; + ETrnStage stage; + ETrnPolicy policy; + ETrnType type; + int32_t code; + int32_t failedTimes; + SRpcHandleInfo rpcInfo; + void* rpcRsp; + int32_t rpcRspLen; + SArray* redoLogs; + SArray* undoLogs; + SArray* commitLogs; + SArray* redoActions; + SArray* undoActions; + int64_t createdTime; + int64_t lastExecTime; + int64_t dbUid; + char dbname[TSDB_DB_FNAME_LEN]; + char lastError[TSDB_TRANS_ERROR_LEN]; + int32_t startFunc; + int32_t stopFunc; + int32_t paramLen; + void* param; } STrans; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 43e0144742..0bc9eb5378 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -563,9 +563,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S pTrans->policy = policy; pTrans->type = type; pTrans->createdTime = taosGetTimestampMs(); - pTrans->rpcHandle = pReq->info.handle; - pTrans->rpcAHandle = pReq->info.ahandle; - pTrans->rpcRefId = pReq->info.refId; + pTrans->rpcInfo = pReq->info; pTrans->redoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->undoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->commitLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *)); @@ -783,9 +781,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { return -1; } - pNew->rpcHandle = pTrans->rpcHandle; - pNew->rpcAHandle = pTrans->rpcAHandle; - pNew->rpcRefId = pTrans->rpcRefId; + pNew->rpcInfo = pTrans->rpcInfo; pNew->rpcRsp = pTrans->rpcRsp; pNew->rpcRspLen = pTrans->rpcRspLen; pTrans->rpcRsp = NULL; @@ -839,7 +835,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { } } - if (sendRsp && pTrans->rpcHandle != NULL) { + if (sendRsp && pTrans->rpcInfo.handle != NULL) { void *rpcCont = rpcMallocCont(pTrans->rpcRspLen); if (rpcCont != NULL) { memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen); @@ -847,17 +843,15 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { taosMemoryFree(pTrans->rpcRsp); mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, code & 0xFFFF, pTrans->stage, - pTrans->rpcAHandle); + pTrans->rpcInfo.ahandle); SRpcMsg rspMsg = { - .info.handle = pTrans->rpcHandle, - .info.ahandle = pTrans->rpcAHandle, - .info.refId = pTrans->rpcRefId, + .info = pTrans->rpcInfo, .code = code, .pCont = rpcCont, .contLen = pTrans->rpcRspLen, }; tmsgSendRsp(&rspMsg); - pTrans->rpcHandle = NULL; + pTrans->rpcInfo.handle = NULL; pTrans->rpcRsp = NULL; pTrans->rpcRspLen = 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index b994b0f8cb..3b47b90254 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -107,9 +107,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp); _exit: - rpcMsg.info.handle = pMsg->info.handle; - rpcMsg.info.ahandle = pMsg->info.ahandle; - rpcMsg.info.refId = pMsg->info.refId; + rpcMsg.info = pMsg->info; rpcMsg.pCont = pRsp; rpcMsg.contLen = rspLen; rpcMsg.code = code; diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 3ecf987811..bcc17a9ae9 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -27,6 +27,8 @@ extern "C" { #include "tref.h" #include "plannodes.h" +#include "trpc.h" + #define QW_DEFAULT_SCHEDULER_NUMBER 10000 #define QW_DEFAULT_TASK_NUMBER 10000 #define QW_DEFAULT_SCH_TASK_NUMBER 10000 @@ -74,18 +76,12 @@ typedef struct SQWDebug { bool dumpEnable; } SQWDebug; -typedef struct SQWConnInfo { - void * handle; - void * ahandle; - int64_t refId; -} SQWConnInfo; - typedef struct SQWMsg { - void * node; - int32_t code; - char * msg; - int32_t msgLen; - SQWConnInfo connInfo; + void *node; + int32_t code; + char *msg; + int32_t msgLen; + SRpcHandleInfo connInfo; } SQWMsg; typedef struct SQWHbParam { @@ -96,7 +92,7 @@ typedef struct SQWHbParam { typedef struct SQWHbInfo { SSchedulerHbRsp rsp; - SQWConnInfo connInfo; + SRpcHandleInfo connInfo; } SQWHbInfo; typedef struct SQWPhaseInput { @@ -127,8 +123,8 @@ typedef struct SQWTaskCtx { bool queryInQueue; int32_t rspCode; - SQWConnInfo ctrlConnInfo; - SQWConnInfo dataConnInfo; + SRpcHandleInfo ctrlConnInfo; + SRpcHandleInfo dataConnInfo; int8_t events[QW_EVENT_MAX]; @@ -140,10 +136,10 @@ typedef struct SQWTaskCtx { typedef struct SQWSchStatus { int32_t lastAccessTs; // timestamp in second SRWLatch hbConnLock; - SQWConnInfo hbConnInfo; + SRpcHandleInfo hbConnInfo; SQueryNodeEpId hbEpId; SRWLatch tasksLock; - SHashObj * tasksHash; // key:queryId+taskId, value: SQWTaskStatus + SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus } SQWSchStatus; // Qnode/Vnode level task management diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index fbd9ce91bc..93994c8287 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -30,21 +30,21 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); -int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code); -int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code); -int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, +int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code); +int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code); +int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); -int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn); -int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code); -int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code); -int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t num); +int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); +int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code); +int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code); +int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num); void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwGetSchTasksStatus(SQWorker *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); -int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); -int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn); -int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SQWConnInfo *pConn); +int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); +int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); +int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn); #ifdef __cplusplus } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 717958c033..e649c67a17 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -536,7 +536,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { int32_t resNum = 0; QW_ERR_RET(qGetExplainExecInfo(ctx->taskHandle, &resNum, &execInfo)); - SQWConnInfo connInfo = {0}; + SRpcHandleInfo connInfo = {0}; connInfo.handle = ctx->ctrlConnInfo.handle; connInfo.refId = ctx->ctrlConnInfo.refId; @@ -723,8 +723,8 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t code = 0; SQWTaskCtx * ctx = NULL; - SQWConnInfo *dropConnection = NULL; - SQWConnInfo *cancelConnection = NULL; + SRpcHandleInfo *dropConnection = NULL; + SRpcHandleInfo *cancelConnection = NULL; QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); @@ -842,10 +842,10 @@ _return: } int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { - int32_t code = 0; - SQWTaskCtx * ctx = NULL; - SQWConnInfo connInfo = {0}; - SQWConnInfo *readyConnection = NULL; + int32_t code = 0; + SQWTaskCtx *ctx = NULL; + SRpcHandleInfo connInfo = {0}; + SRpcHandleInfo *readyConnection = NULL; QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 02707ace6a..55df66d861 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -43,7 +43,7 @@ void qwFreeFetchRsp(void *msg) { } } -int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { +int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code) { SQueryTableRsp rsp = {.code = code}; int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp); @@ -52,9 +52,7 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { SRpcMsg rpcRsp = { .msgType = TDMT_VND_QUERY_RSP, - .info.handle = pConn->handle, - .info.ahandle = pConn->ahandle, - .info.refId = pConn->refId, + .info = pConn, .pCont = msg, .contLen = contLen, .code = code, @@ -65,14 +63,13 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) { +int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code) { SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp)); pRsp->code = code; SRpcMsg rpcRsp = { .msgType = TDMT_VND_RES_READY_RSP, - .info.handle = pConn->handle, - .info.refId = pConn->refId, + .info = pConn, .info.ahandle = NULL, .pCont = pRsp, .contLen = sizeof(*pRsp), @@ -84,7 +81,7 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) { return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t num) { +int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num) { SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo}; int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp); @@ -93,9 +90,7 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, SRpcMsg rpcRsp = { .msgType = TDMT_VND_EXPLAIN_RSP, - .info.handle = pConn->handle, - .info.ahandle = pConn->ahandle, - .info.refId = pConn->refId, + .info = pConn, .pCont = pRsp, .contLen = contLen, .code = 0, @@ -106,16 +101,14 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { +int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); void *pRsp = rpcMallocCont(contLen); tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); SRpcMsg rpcRsp = { .msgType = TDMT_VND_QUERY_HEARTBEAT_RSP, - .info.handle = pConn->handle, - .info.ahandle = pConn->ahandle, - .info.refId = pConn->refId, + .info = pConn, .pCont = pRsp, .contLen = contLen, .code = code, @@ -126,7 +119,7 @@ int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_ return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { +int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { if (NULL == pRsp) { pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); memset(pRsp, 0, sizeof(SRetrieveTableRsp)); @@ -135,9 +128,7 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3 SRpcMsg rpcRsp = { .msgType = TDMT_VND_FETCH_RSP, - .info.handle = pConn->handle, - .info.ahandle = pConn->ahandle, - .info.refId = pConn->refId, + .info = pConn, .pCont = pRsp, .contLen = sizeof(*pRsp) + dataLength, .code = code, @@ -148,15 +139,13 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3 return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) { +int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) { STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); pRsp->code = code; SRpcMsg rpcRsp = { .msgType = TDMT_VND_CANCEL_TASK_RSP, - .info.handle = pConn->handle, - .info.ahandle = pConn->ahandle, - .info.refId = pConn->refId, + .info = pConn, .pCont = pRsp, .contLen = sizeof(*pRsp), .code = code, @@ -166,15 +155,13 @@ int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) { return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) { +int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) { STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp)); pRsp->code = code; SRpcMsg rpcRsp = { .msgType = TDMT_VND_DROP_TASK_RSP, - .info.handle = pConn->handle, - .info.ahandle = pConn->ahandle, - .info.refId = pConn->refId, + .info = pConn, .pCont = pRsp, .contLen = sizeof(*pRsp), .code = code, @@ -228,9 +215,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { tSerializeSShowRsp(pBuf, bufLen, &showRsp); SRpcMsg rpcMsg = { - .info.handle = pMsg->info.handle, - .info.ahandle = pMsg->info.ahandle, - .info.refId = pMsg->info.refId, + .info = pMsg->info, .pCont = pBuf, .contLen = bufLen, .code = code, @@ -246,9 +231,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq *pFetchRe pRsp->numOfRows = 0; SRpcMsg rpcMsg = { - .info.handle = pMsg->info.handle, - .info.ahandle = pMsg->info.ahandle, - .info.refId = pMsg->info.refId, + .info = pMsg->info, .pCont = pRsp, .contLen = sizeof(*pRsp), .code = 0, @@ -258,7 +241,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq *pFetchRe return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { +int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq)); if (NULL == req) { QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq)); @@ -292,7 +275,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { return TSDB_CODE_SUCCESS; } -int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { +int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq)); if (NULL == req) { QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq)); @@ -320,7 +303,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { return TSDB_CODE_SUCCESS; } -int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SQWConnInfo *pConn) { +int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn) { SSchedulerHbReq req = {0}; req.header.vgId = mgmt->nodeId; req.sId = sId; -- GitLab