From 53b4c06d9b1442c0063dc6520fa8982d913f8da3 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 11 Jan 2022 17:07:34 -0800 Subject: [PATCH] add msg queue --- include/dnode/vnode/vnode.h | 2 +- source/dnode/mgmt/impl/inc/dndVnodes.h | 2 ++ source/dnode/mgmt/impl/src/dndVnodes.c | 26 ++++++++++++++++++++------ source/dnode/mgmt/impl/src/dnode.c | 4 +--- source/dnode/vnode/impl/inc/vnodeDef.h | 2 +- source/dnode/vnode/impl/src/vnodeMgr.c | 10 ++++++---- 6 files changed, 31 insertions(+), 15 deletions(-) diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 03a59b01e6..4332fc8e58 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -32,7 +32,7 @@ extern "C" { /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SVnode SVnode; typedef struct SDnode SDnode; -typedef void (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); +typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); typedef struct SVnodeCfg { int32_t vgId; diff --git a/source/dnode/mgmt/impl/inc/dndVnodes.h b/source/dnode/mgmt/impl/inc/dndVnodes.h index b5fae62959..a78db602fe 100644 --- a/source/dnode/mgmt/impl/inc/dndVnodes.h +++ b/source/dnode/mgmt/impl/inc/dndVnodes.h @@ -36,6 +36,8 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq); int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq); int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pReq); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 9f585859a8..2d7999fe5a 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -802,7 +802,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t } } -static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) { +static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg, bool sendRsp) { int32_t code = 0; if (pQueue == NULL) { @@ -819,13 +819,15 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) } } - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS && sendRsp) { if (pRpcMsg->msgType & 1u) { SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; rpcSendResponse(&rsp); } rpcFreeCont(pRpcMsg->pCont); } + + return code; } static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { @@ -848,7 +850,7 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg); + (void)dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg, true); dndReleaseVnode(pDnode, pVnode); } } @@ -856,7 +858,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg); + (void)dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg, true); dndReleaseVnode(pDnode, pVnode); } } @@ -864,7 +866,7 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg); + (void)dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, true); dndReleaseVnode(pDnode, pVnode); } } @@ -872,11 +874,23 @@ void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg); + (void)dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, true); dndReleaseVnode(pDnode, pVnode); } } +int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pMsg) { + SMsgHead *pHead = pMsg->pCont; + // pHead->vgId = htonl(pHead->vgId); + + SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId); + if (pVnode == NULL) return -1; + + int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, false); + dndReleaseVnode(pDnode, pVnode); + return code; +} + static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); if (pVnode == NULL) return -1; diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index ea42db96ab..f3d5e09564 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -153,8 +153,6 @@ static void dndCleanupEnv(SDnode *pDnode) { taosStopCacheRefreshWorker(); } -static void dndPutMsgToVQueryQ(SDnode *pDnode, SRpcMsg *pRpcMsg) { dndProcessVnodeQueryMsg(pDnode, pRpcMsg, NULL); } - SDnode *dndInit(SDnodeOpt *pOption) { taosIgnSIGPIPE(); taosBlockSIGPIPE(); @@ -204,7 +202,7 @@ SDnode *dndInit(SDnodeOpt *pOption) { .locale = pDnode->opt.locale, .charset = pDnode->opt.charset, .nthreads = pDnode->opt.numOfCommitThreads, - .putReqToVQueryQFp = dndPutMsgToVQueryQ, + .putReqToVQueryQFp = dndPutReqToVQueryQ, }; if (vnodeInit(&vnodeOpt) != 0) { dError("failed to init vnode env"); diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index 4f53dcd899..f9172dd351 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -82,7 +82,7 @@ struct SVnode { int vnodeScheduleTask(SVnodeTask* task); -void vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq); +int32_t vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq); #ifdef __cplusplus } diff --git a/source/dnode/vnode/impl/src/vnodeMgr.c b/source/dnode/vnode/impl/src/vnodeMgr.c index 51f33031ac..cc369a0d15 100644 --- a/source/dnode/vnode/impl/src/vnodeMgr.c +++ b/source/dnode/vnode/impl/src/vnodeMgr.c @@ -90,10 +90,12 @@ int vnodeScheduleTask(SVnodeTask* pTask) { return 0; } -void vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { - assert(vnodeMgr.putReqToVQueryQFp); - assert(pVnode->pDnode); - (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq); +int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { + if (pVnode == NULL || pVnode->pDnode == NULL || vnodeMgr.putReqToVQueryQFp == NULL) { + terrno = TSDB_CODE_VND_APP_ERROR; + return -1; + } + return (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq); } /* ------------------------ STATIC METHODS ------------------------ */ -- GitLab