diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 03a59b01e67cc97de84db85989abe85bf8d8f89a..4332fc8e58ca3ccc16a787c52a8459edc8f53010 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -32,7 +32,7 @@ extern "C" { /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SVnode SVnode; typedef struct SDnode SDnode; -typedef void (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); +typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); typedef struct SVnodeCfg { int32_t vgId; diff --git a/source/dnode/mgmt/impl/inc/dndVnodes.h b/source/dnode/mgmt/impl/inc/dndVnodes.h index b5fae629592986923476ac44ce28d908bd628554..a78db602febe4a1ca8274eebf00e9be5674203f7 100644 --- a/source/dnode/mgmt/impl/inc/dndVnodes.h +++ b/source/dnode/mgmt/impl/inc/dndVnodes.h @@ -36,6 +36,8 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq); int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq); int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pReq); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 9f585859a8622a695cb99ac07fc36c57aa3048f5..2d7999fe5ac77c70d3906945efca056f79b08911 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -802,7 +802,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t } } -static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) { +static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg, bool sendRsp) { int32_t code = 0; if (pQueue == NULL) { @@ -819,13 +819,15 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) } } - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS && sendRsp) { if (pRpcMsg->msgType & 1u) { SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; rpcSendResponse(&rsp); } rpcFreeCont(pRpcMsg->pCont); } + + return code; } static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { @@ -848,7 +850,7 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg); + (void)dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg, true); dndReleaseVnode(pDnode, pVnode); } } @@ -856,7 +858,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg); + (void)dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg, true); dndReleaseVnode(pDnode, pVnode); } } @@ -864,7 +866,7 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg); + (void)dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, true); dndReleaseVnode(pDnode, pVnode); } } @@ -872,11 +874,23 @@ void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg); + (void)dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, true); dndReleaseVnode(pDnode, pVnode); } } +int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pMsg) { + SMsgHead *pHead = pMsg->pCont; + // pHead->vgId = htonl(pHead->vgId); + + SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId); + if (pVnode == NULL) return -1; + + int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, false); + dndReleaseVnode(pDnode, pVnode); + return code; +} + static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); if (pVnode == NULL) return -1; diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index ea42db96ab56aef1a2a1d615187bbb225143a214..f3d5e095641efea5f1ce9326da982b930ac5f4b3 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -153,8 +153,6 @@ static void dndCleanupEnv(SDnode *pDnode) { taosStopCacheRefreshWorker(); } -static void dndPutMsgToVQueryQ(SDnode *pDnode, SRpcMsg *pRpcMsg) { dndProcessVnodeQueryMsg(pDnode, pRpcMsg, NULL); } - SDnode *dndInit(SDnodeOpt *pOption) { taosIgnSIGPIPE(); taosBlockSIGPIPE(); @@ -204,7 +202,7 @@ SDnode *dndInit(SDnodeOpt *pOption) { .locale = pDnode->opt.locale, .charset = pDnode->opt.charset, .nthreads = pDnode->opt.numOfCommitThreads, - .putReqToVQueryQFp = dndPutMsgToVQueryQ, + .putReqToVQueryQFp = dndPutReqToVQueryQ, }; if (vnodeInit(&vnodeOpt) != 0) { dError("failed to init vnode env"); diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index 4f53dcd899414e9a7740f87cb62c1df13b3451b2..f9172dd351e8df29d81d4d358d209a064bd5c11d 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -82,7 +82,7 @@ struct SVnode { int vnodeScheduleTask(SVnodeTask* task); -void vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq); +int32_t vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq); #ifdef __cplusplus } diff --git a/source/dnode/vnode/impl/src/vnodeMgr.c b/source/dnode/vnode/impl/src/vnodeMgr.c index 51f33031ac7a12850d24875afc89a9870ed48ee6..cc369a0d154816e2656ad67717fc6f2e57739155 100644 --- a/source/dnode/vnode/impl/src/vnodeMgr.c +++ b/source/dnode/vnode/impl/src/vnodeMgr.c @@ -90,10 +90,12 @@ int vnodeScheduleTask(SVnodeTask* pTask) { return 0; } -void vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { - assert(vnodeMgr.putReqToVQueryQFp); - assert(pVnode->pDnode); - (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq); +int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { + if (pVnode == NULL || pVnode->pDnode == NULL || vnodeMgr.putReqToVQueryQFp == NULL) { + terrno = TSDB_CODE_VND_APP_ERROR; + return -1; + } + return (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq); } /* ------------------------ STATIC METHODS ------------------------ */