diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 595c03b752f15814a9eb0cbcd5ae91feb3183e8c..b1c64e1b40fb983fe3dd8b888230e40a509a9986 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2319,14 +2319,6 @@ typedef struct { #pragma pack(pop) -struct SRpcMsg; -struct SEpSet; -struct SMgmtWrapper; -typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq); -typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* pReq); -typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq); -typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pRsp); - #ifdef __cplusplus } #endif diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h new file mode 100644 index 0000000000000000000000000000000000000000..413cb957d2c12d5a8ae11c063d6ebc592b084ae9 --- /dev/null +++ b/include/common/tmsgcb.h @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_COMMON_MSG_CB_H_ +#define _TD_COMMON_MSG_CB_H_ + +#include "os.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct SRpcMsg; +struct SEpSet; +struct SMgmtWrapper; +typedef struct SMgmtWrapper SMgmtWrapper; + +typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq); +typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* pReq); +typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq); +typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pRsp); + +typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EMsgQueueType; + +typedef struct { + struct SMgmtWrapper* pWrapper; + PutToQueueFp queueFps[QUEUE_MAX]; + SendReqFp sendReqFp; + SendMnodeReqFp sendMnodeReqFp; + SendRspFp sendRspFp; +} SMsgCb; + +int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EMsgQueueType qtype, struct SRpcMsg* pReq); +int32_t tmsgSendReq(const SMsgCb* pMsgCb, struct SEpSet* epSet, struct SRpcMsg* pReq); +int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, struct SRpcMsg* pReq); +void tmsgSendRsp(const SMsgCb* pMsgCb, struct SRpcMsg* pRsp); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_COMMON_MSG_CB_H_*/ diff --git a/include/dnode/bnode/bnode.h b/include/dnode/bnode/bnode.h index 6f5888bdeb1702434ef7e94f9a0c482ff534faa7..528cea8828a8cb63356ed04cb6514df876feb772 100644 --- a/include/dnode/bnode/bnode.h +++ b/include/dnode/bnode/bnode.h @@ -16,22 +16,20 @@ #ifndef _TD_BNODE_H_ #define _TD_BNODE_H_ +#include "tmsgcb.h" + #ifdef __cplusplus extern "C" { #endif /* ------------------------ TYPES EXPOSED ------------------------ */ -typedef struct SMgmtWrapper SMgmtWrapper; -typedef struct SBnode SBnode; +typedef struct SBnode SBnode; typedef struct { } SBnodeLoad; typedef struct { - SMgmtWrapper *pWrapper; - SendReqFp sendReqFp; - SendMnodeReqFp sendMnodeReqFp; - SendRspFp sendRspFp; + SMsgCb msgCb; } SBnodeOpt; /* ------------------------ SBnode ------------------------ */ diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index d421b2e45d1ea0751aa3aaff0b42144ff293e247..eed13e031753495e8eb36e33a96d31f8048bc88f 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -17,27 +17,22 @@ #define _TD_MND_H_ #include "monitor.h" +#include "tmsgcb.h" #ifdef __cplusplus extern "C" { #endif /* ------------------------ TYPES EXPOSED ------------------------ */ -typedef struct SMgmtWrapper SMgmtWrapper; -typedef struct SMnode SMnode; +typedef struct SMnode SMnode; typedef struct { - int32_t dnodeId; - int64_t clusterId; - int8_t replica; - int8_t selfIndex; - SReplica replicas[TSDB_MAX_REPLICA]; - SMgmtWrapper *pWrapper; - PutToQueueFp putToWriteQFp; - PutToQueueFp putToReadQFp; - SendReqFp sendReqFp; - SendMnodeReqFp sendMnodeReqFp; - SendRspFp sendRspFp; + int32_t dnodeId; + int64_t clusterId; + int8_t replica; + int8_t selfIndex; + SReplica replicas[TSDB_MAX_REPLICA]; + SMsgCb msgCb; } SMnodeOpt; /* ------------------------ SMnode ------------------------ */ diff --git a/include/dnode/qnode/qnode.h b/include/dnode/qnode/qnode.h index 354d8d2819fdce7d379502dbd3d73070023fa66e..67ee509622de950b8ede9e134cca9163f87b7d22 100644 --- a/include/dnode/qnode/qnode.h +++ b/include/dnode/qnode/qnode.h @@ -16,13 +16,14 @@ #ifndef _TD_QNODE_H_ #define _TD_QNODE_H_ +#include "tmsgcb.h" + #ifdef __cplusplus extern "C" { #endif /* ------------------------ TYPES EXPOSED ------------------------ */ -typedef struct SMgmtWrapper SMgmtWrapper; -typedef struct SQnode SQnode; +typedef struct SQnode SQnode; typedef struct { int64_t numOfStartTask; @@ -36,10 +37,7 @@ typedef struct { } SQnodeLoad; typedef struct { - SMgmtWrapper *pWrapper; - SendReqFp sendReqFp; - SendMnodeReqFp sendMnodeReqFp; - SendRspFp sendRspFp; + SMsgCb msgCb; } SQnodeOpt; /* ------------------------ SQnode ------------------------ */ diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index 4855f71a76dd7d800b29ed37e07a86bece253066..037dca968ab2cf58ee142215aa4d78612d7482f8 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -16,7 +16,7 @@ #ifndef _TD_SNODE_H_ #define _TD_SNODE_H_ -#include "tcommon.h" +#include "tmsgcb.h" #include "tmsg.h" #include "trpc.h" @@ -25,17 +25,13 @@ extern "C" { #endif /* ------------------------ TYPES EXPOSED ------------------------ */ -typedef struct SMgmtWrapper SMgmtWrapper; -typedef struct SSnode SSnode; +typedef struct SSnode SSnode; typedef struct { } SSnodeLoad; typedef struct { - SMgmtWrapper *pWrapper; - SendReqFp sendReqFp; - SendMnodeReqFp sendMnodeReqFp; - SendRspFp sendRspFp; + SMsgCb msgCb; } SSnodeOpt; /* ------------------------ SSnode ------------------------ */ diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 8954cbaa0ec1f583b0524413ce9e73eb189e0d16..944ac97ddbbf667d4496bf6fd240d1ea2751096d 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -20,6 +20,7 @@ extern "C" { #endif +#include "tmsgcb.h" #include "trpc.h" @@ -48,11 +49,7 @@ typedef struct { uint64_t numOfErrors; } SQWorkerStat; -typedef int32_t (*putReqToQueryQFp)(void *, struct SRpcMsg *); -typedef int32_t (*sendReqFp)(void *, struct SEpSet *, struct SRpcMsg *); - -int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, - putReqToQueryQFp fp1, sendReqFp fp2); +int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb); int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c new file mode 100644 index 0000000000000000000000000000000000000000..19973770e4432f84001e19f11640435c6accab6b --- /dev/null +++ b/source/common/src/tmsgcb.c @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "tmsgcb.h" + +int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EMsgQueueType qtype, struct SRpcMsg* pReq) { + return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq); +} + +int32_t tmsgSendReq(const SMsgCb* pMsgCb, struct SEpSet* epSet, struct SRpcMsg* pReq) { + return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq); +} + +int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, struct SRpcMsg* pReq) { + return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq); +} + +void tmsgSendRsp(const SMsgCb* pMsgCb, struct SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); } diff --git a/source/dnode/bnode/inc/bndInt.h b/source/dnode/bnode/inc/bndInt.h index 4a0d16a7498c1166b3d4742111cf7b2f676abbcc..e611d230a329ad597ba5b7ee634c3246b6c44294 100644 --- a/source/dnode/bnode/inc/bndInt.h +++ b/source/dnode/bnode/inc/bndInt.h @@ -30,7 +30,7 @@ extern "C" { #endif typedef struct SBnode { - SBnodeOpt opt; + SMsgCb msgCb; } SBnode; #ifdef __cplusplus diff --git a/source/dnode/bnode/src/bnode.c b/source/dnode/bnode/src/bnode.c index 2e5c96ecdda07824fe057400e088c6514d6b736e..4236e85a7f4c28d5c93d7e5d0144c82bdad6f8fe 100644 --- a/source/dnode/bnode/src/bnode.c +++ b/source/dnode/bnode/src/bnode.c @@ -17,6 +17,7 @@ SBnode *bndOpen(const char *path, const SBnodeOpt *pOption) { SBnode *pBnode = calloc(1, sizeof(SBnode)); + pBnode->msgCb = pOption->msgCb; return pBnode; } diff --git a/source/dnode/mgmt/bnode/src/bmInt.c b/source/dnode/mgmt/bnode/src/bmInt.c index 85102cbffc6742477346351dd11cbbd49e73379c..34c6dbeb571338379e62919ef0bb2bd784a82948 100644 --- a/source/dnode/mgmt/bnode/src/bmInt.c +++ b/source/dnode/mgmt/bnode/src/bmInt.c @@ -19,11 +19,12 @@ static int32_t bmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { - SDnode *pDnode = pMgmt->pDnode; - pOption->pWrapper = pMgmt->pWrapper; - pOption->sendReqFp = dndSendReqToDnode; - pOption->sendMnodeReqFp = dndSendReqToMnode; - pOption->sendRspFp = dndSendRsp; + SMsgCb msgCb = {0}; + msgCb.pWrapper = pMgmt->pWrapper; + msgCb.sendReqFp = dndSendReqToDnode; + msgCb.sendMnodeReqFp = dndSendReqToMnode; + msgCb.sendRspFp = dndSendRsp; + pOption->msgCb = msgCb; } static int32_t bmOpenImp(SBnodeMgmt *pMgmt) { diff --git a/source/dnode/mgmt/mnode/src/mmInt.c b/source/dnode/mgmt/mnode/src/mmInt.c index a3c2d40510c47640ff5e54dc46270545c9b4c341..aa348ae87947b99f189a9ab50fec41c1507652ea 100644 --- a/source/dnode/mgmt/mnode/src/mmInt.c +++ b/source/dnode/mgmt/mnode/src/mmInt.c @@ -39,14 +39,17 @@ static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) { static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { SDnode *pDnode = pMgmt->pDnode; - pOption->pWrapper = pMgmt->pWrapper; - pOption->putToWriteQFp = mmPutMsgToWriteQueue; - pOption->putToReadQFp = mmPutMsgToReadQueue; - pOption->sendReqFp = dndSendReqToDnode; - pOption->sendMnodeReqFp = dndSendReqToMnode; - pOption->sendRspFp = dndSendRsp; pOption->dnodeId = pDnode->dnodeId; pOption->clusterId = pDnode->clusterId; + + SMsgCb msgCb = {0}; + msgCb.pWrapper = pMgmt->pWrapper; + msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToReadQueue; + msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue; + msgCb.sendReqFp = dndSendReqToDnode; + msgCb.sendMnodeReqFp = dndSendReqToMnode; + msgCb.sendRspFp = dndSendRsp; + pOption->msgCb = msgCb; } static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { diff --git a/source/dnode/mgmt/qnode/src/qmInt.c b/source/dnode/mgmt/qnode/src/qmInt.c index f9a39a877482f5a536c6b362792b21714c846ccf..38defd1dc8ff55612fb4c03c8b4f38ec12d11b1d 100644 --- a/source/dnode/mgmt/qnode/src/qmInt.c +++ b/source/dnode/mgmt/qnode/src/qmInt.c @@ -19,11 +19,12 @@ static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { - SDnode *pDnode = pMgmt->pDnode; - pOption->pWrapper = pMgmt->pWrapper; - pOption->sendReqFp = dndSendReqToDnode; - pOption->sendMnodeReqFp = dndSendReqToMnode; - pOption->sendRspFp = dndSendRsp; + SMsgCb msgCb = {0}; + msgCb.pWrapper = pMgmt->pWrapper; + msgCb.sendReqFp = dndSendReqToDnode; + msgCb.sendMnodeReqFp = dndSendReqToMnode; + msgCb.sendRspFp = dndSendRsp; + pOption->msgCb = msgCb; } static int32_t qmOpenImp(SQnodeMgmt *pMgmt) { diff --git a/source/dnode/mgmt/snode/src/smInt.c b/source/dnode/mgmt/snode/src/smInt.c index 62f2edacb751cbb03dcace5779414021c914f87c..b4d6576e5740d40388f6d5d6a04efc8aa1316669 100644 --- a/source/dnode/mgmt/snode/src/smInt.c +++ b/source/dnode/mgmt/snode/src/smInt.c @@ -19,11 +19,12 @@ static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { - SDnode *pDnode = pMgmt->pDnode; - pOption->pWrapper = pMgmt->pWrapper; - pOption->sendReqFp = dndSendReqToDnode; - pOption->sendMnodeReqFp = dndSendReqToMnode; - pOption->sendRspFp = dndSendRsp; + SMsgCb msgCb = {0}; + msgCb.pWrapper = pMgmt->pWrapper; + msgCb.sendReqFp = dndSendReqToDnode; + msgCb.sendMnodeReqFp = dndSendReqToMnode; + msgCb.sendRspFp = dndSendRsp; + pOption->msgCb = msgCb; } static int32_t smOpenImp(SSnodeMgmt *pMgmt) { diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index c5e79765c76871858ac0099c71f647921babfcbc..12f38d71ca28ab1b3cd59fbb336007e3fbf2a7da 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -128,7 +128,13 @@ static void *vmOpenVnodeFunc(void *param) { pMgmt->state.openVnodes, pMgmt->state.totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); - SVnodeCfg cfg = {.pWrapper = pMgmt->pWrapper, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; + SMsgCb msgCb = {0}; + msgCb.pWrapper = pMgmt->pWrapper; + msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; + msgCb.sendReqFp = dndSendReqToDnode; + msgCb.sendMnodeReqFp = dndSendReqToMnode; + msgCb.sendRspFp = dndSendRsp; + SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); @@ -262,7 +268,6 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { SDnode *pDnode = pWrapper->pDnode; SVnodesMgmt *pMgmt = calloc(1, sizeof(SVnodesMgmt)); int32_t code = -1; - SVnodeOpt vnodeOpt = {0}; dInfo("vnodes-mgmt start to init"); if (pMgmt == NULL) goto _OVER; @@ -294,13 +299,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { goto _OVER; } - vnodeOpt.nthreads = tsNumOfCommitThreads; - vnodeOpt.putToQueryQFp = vmPutMsgToQueryQueue; - vnodeOpt.putToFetchQFp = vmPutMsgToQueryQueue; - vnodeOpt.sendReqFp = dndSendReqToDnode; - vnodeOpt.sendMnodeReqFp = dndSendReqToMnode; - vnodeOpt.sendRspFp = dndSendRsp; - if (vnodeInit(&vnodeOpt) != 0) { + if (vnodeInit() != 0) { dError("failed to init vnode since %s", terrstr()); goto _OVER; } diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 53423845d4ede2728a623ab9a40203dee2757945..efae47b195e18fb03be03f12c6fde6fcf0b7f6d2 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -82,7 +82,14 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - vnodeCfg.pWrapper = pMgmt->pWrapper; + SMsgCb msgCb = {0}; + msgCb.pWrapper = pMgmt->pWrapper; + msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; + msgCb.sendReqFp = dndSendReqToDnode; + msgCb.sendMnodeReqFp = dndSendReqToMnode; + msgCb.sendRspFp = dndSendRsp; + + vnodeCfg.msgCb = msgCb; vnodeCfg.pTfs = pMgmt->pTfs; vnodeCfg.dbId = wrapperCfg.dbUid; SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 88dd3a5c22c534d186264c555a7097dc8aaa8664..d5bffada6afb6394ff6efed20c43e856d6765698 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -119,19 +119,12 @@ typedef struct SMnode { SHashObj *infosMeta; SGrantInfo grant; MndMsgFp msgFp[TDMT_MAX]; - SendReqFp sendReqFp; - SendMnodeReqFp sendMnodeReqFp; - PutToQueueFp putToWriteQFp; - PutToQueueFp putToReadQFp; + SMsgCb msgCb; } SMnode; -int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); -int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg); -void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); - +void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); uint64_t mndGenerateUid(char *name, int32_t len); - -void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); +void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index bf79c89a1c19b6fddd4bab70ce13e67e5effff72..c44a5aaaa310d62f33c95d191ed9863e87ff7ba1 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -623,7 +623,7 @@ static int32_t mndProcessConfigDnodeReq(SNodeMsg *pReq) { .msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .ahandle = pReq->rpcMsg.ahandle}; mInfo("dnode:%d, app:%p config:%s req send to dnode", cfgReq.dnodeId, rpcMsg.ahandle, cfgReq.config); - mndSendReqToDnode(pMnode, &epSet, &rpcMsg); + tmsgSendReq(&pMnode->msgCb, &epSet, &rpcMsg); return 0; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 25ec5f7cd470700f140d331f5b60ac362a11fcf1..4fbf0352e5c021433e451aa7e5f564e099b5c07b 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -420,7 +420,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { .pCont = pRebMsg, .contLen = sizeof(SMqDoRebalanceMsg), }; - (*pMnode->putToWriteQFp)(pMnode->pWrapper, &rpcMsg); + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } else { taosHashCleanup(pRebMsg->rebSubHash); rpcFreeCont(pRebMsg); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index d3b36aed31a39d592310a1d679e86b2f0f180b63..e2a6e49b56288dbb533f85e6630795d3289f2674 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -888,7 +888,7 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr } memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); - if (mndSendReqToDnode(pMnode, &pAction->epSet, &rpcMsg) == 0) { + if (tmsgSendReq(&pMnode->msgCb, &pAction->epSet, &rpcMsg) == 0) { mDebug("trans:%d, action:%d is sent", pTrans->id, action); pAction->msgSent = 1; pAction->msgReceived = 0; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 39f2155109f164e3897df5ca2e425d2a2f8de1b2..6400bf69f1db66ba13db55fdf8cfc84303b96853 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -43,24 +43,6 @@ #define TRNAS_TIMER_MS 6000 #define TELEM_TIMER_MS 86400000 -int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) { - if (pMnode == NULL || pMnode->sendReqFp == NULL) { - terrno = TSDB_CODE_MND_NOT_READY; - return -1; - } - - return (*pMnode->sendReqFp)(pMnode->pWrapper, pEpSet, pMsg); -} - -int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) { - if (pMnode == NULL || pMnode->sendReqFp == NULL) { - terrno = TSDB_CODE_MND_NOT_READY; - return -1; - } - - return (*pMnode->sendMnodeReqFp)(pMnode->pWrapper, pMsg); -} - static void *mndBuildTimerMsg(int32_t *pContLen) { SMTimerReq timerReq = {0}; @@ -80,7 +62,7 @@ static void mndPullupTrans(void *param, void *tmrId) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen}; - pMnode->putToWriteQFp(pMnode->pWrapper, &rpcMsg); + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer); @@ -96,7 +78,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { .pCont = pReq, .contLen = contLen, }; - pMnode->putToReadQFp(pMnode->pWrapper, &rpcMsg); + tmsgPutToQueue(&pMnode->msgCb, QUERY_QUEUE, &rpcMsg); } taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer); @@ -108,7 +90,7 @@ static void mndPullupTelem(void *param, void *tmrId) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; - pMnode->putToReadQFp(pMnode->pWrapper, &rpcMsg); + tmsgPutToQueue(&pMnode->msgCb, QUERY_QUEUE, &rpcMsg); } taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer); @@ -286,14 +268,9 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { pMnode->replica = pOption->replica; pMnode->selfIndex = pOption->selfIndex; memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); - pMnode->pWrapper = pOption->pWrapper; - pMnode->putToWriteQFp = pOption->putToWriteQFp; - pMnode->putToReadQFp = pOption->putToReadQFp; - pMnode->sendReqFp = pOption->sendReqFp; - pMnode->sendMnodeReqFp = pOption->sendMnodeReqFp; - - if (pMnode->sendReqFp == NULL || pMnode->sendMnodeReqFp == NULL || - pMnode->putToWriteQFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) { + pMnode->msgCb = pOption->msgCb; + + if (pMnode->dnodeId < 0 || pMnode->clusterId < 0) { terrno = TSDB_CODE_MND_INVALID_OPTIONS; return -1; } diff --git a/source/dnode/qnode/inc/qndInt.h b/source/dnode/qnode/inc/qndInt.h index 357a62052aea7c096c5ac87465c126af9de8338f..168e538ac0147a5966f9869b217160b5ac52484c 100644 --- a/source/dnode/qnode/inc/qndInt.h +++ b/source/dnode/qnode/inc/qndInt.h @@ -32,8 +32,8 @@ typedef struct SQWorkerMgmt SQHandle; typedef struct SQnode { int32_t qndId; - SQnodeOpt opt; - SQHandle* pQuery; + SMsgCb msgCb; + SQHandle* pQuery; } SQnode; #ifdef __cplusplus diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index a257b343c26daa6cd6234ef9bf5d072263c6c4da..37fd4cbed527ec22149b30d6753c93e98f74dfaf 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -13,14 +13,10 @@ * along with this program. If not, see . */ +#include "executor.h" #include "qndInt.h" #include "query.h" #include "qworker.h" -#include "executor.h" - -int32_t qnodePutReqToVQueryQ(SQnode* pQnode, struct SRpcMsg* pReq) {} -void qnodeSendReqToDnode(SQnode* pQnode, struct SEpSet* epSet, struct SRpcMsg* pReq) {} - SQnode *qndOpen(const SQnodeOpt *pOption) { SQnode *pQnode = calloc(1, sizeof(SQnode)); @@ -29,12 +25,12 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { return NULL; } - if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, pQnode, - (putReqToQueryQFp)qnodePutReqToVQueryQ, (sendReqFp)qnodeSendReqToDnode)) { + if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) { tfree(pQnode); return NULL; } - + + pQnode->msgCb = pOption->msgCb; return pQnode; } diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index d1122fc4ecaae129ada2e427bb223cb6780e8e44..519b94cf46cc57ce3441712ffed6b3f7b2a7e02e 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -44,7 +44,7 @@ typedef struct { typedef struct SSnode { SStreamMeta* pMeta; - SSnodeOpt cfg; + SMsgCb msgCb; } SSnode; SStreamMeta* sndMetaNew(); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 1e9e48e2064db4a8404c7277fd0ba8eec367e9eb..b0ce7ad8c557028b5b471c36f3b7a8a356b04a37 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -22,7 +22,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { if (pSnode == NULL) { return NULL; } - memcpy(&pSnode->cfg, pOption, sizeof(SSnodeOpt)); + pSnode->msgCb = pOption->msgCb; pSnode->pMeta = sndMetaNew(); if (pSnode->pMeta == NULL) { free(pSnode); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 1ffd4e0d782f57e335caca070da7500b442ff6f5..67446d15ab7fa530eb57941d2115c7f2d62e3357 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -18,6 +18,7 @@ #include "os.h" #include "trpc.h" +#include "tmsgcb.h" #include "meta.h" #include "tarray.h" @@ -40,7 +41,6 @@ typedef struct { typedef struct { int32_t vgId; uint64_t dbId; - void *pWrapper; STfs *pTfs; uint64_t wsize; uint64_t ssize; @@ -54,20 +54,12 @@ typedef struct { SMetaCfg metaCfg; STqCfg tqCfg; SWalCfg walCfg; + SMsgCb msgCb; uint32_t hashBegin; uint32_t hashEnd; int8_t hashMethod; } SVnodeCfg; -typedef struct { - uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) - PutToQueueFp putToQueryQFp; - PutToQueueFp putToFetchQFp; - SendReqFp sendReqFp; - SendMnodeReqFp sendMnodeReqFp; - SendRspFp sendRspFp; -} SVnodeOpt; - typedef struct { int64_t ver; int64_t tbUid; @@ -87,10 +79,9 @@ typedef struct { /** * @brief Initialize the vnode module * - * @param pOption Option of the vnode mnodule * @return int 0 for success and -1 for failure */ -int vnodeInit(const SVnodeOpt *pOption); +int vnodeInit(); /** * @brief Cleanup the vnode module diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index de9b7bac837aea248a018e04b10ea54782c7c983..c911f95e4aa4fe28afec3e46178d973ca1e1eec9 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -52,12 +52,6 @@ typedef struct SVnodeMgr { TdThreadMutex mutex; TdThreadCond hasTask; TD_DLIST(SVnodeTask) queue; - // For vnode Mgmt - PutToQueueFp putToQueryQFp; - PutToQueueFp putToFetchQFp; - SendReqFp sendReqFp; - SendMnodeReqFp sendMnodeReqFp; - SendRspFp sendRspFp; } SVnodeMgr; extern SVnodeMgr vnodeMgr; @@ -81,7 +75,7 @@ struct SVnode { SWal* pWal; tsem_t canCommit; SQHandle* pQuery; - void* pWrapper; + SMsgCb msgCb; STfs* pTfs; }; diff --git a/source/dnode/vnode/src/vnd/vnodeMain.c b/source/dnode/vnode/src/vnd/vnodeMain.c index c7405fdceaf1caa0ea04015249b044b06de561b1..86e670d533999baab19b332ff5617982df65a2ba 100644 --- a/source/dnode/vnode/src/vnd/vnodeMain.c +++ b/source/dnode/vnode/src/vnd/vnodeMain.c @@ -27,7 +27,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { SVnodeCfg cfg = defaultVnodeOptions; if (pVnodeCfg != NULL) { cfg.vgId = pVnodeCfg->vgId; - cfg.pWrapper = pVnodeCfg->pWrapper; + cfg.msgCb = pVnodeCfg->msgCb; cfg.pTfs = pVnodeCfg->pTfs; cfg.dbId = pVnodeCfg->dbId; cfg.hashBegin = pVnodeCfg->hashBegin; @@ -79,7 +79,7 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { } pVnode->vgId = pVnodeCfg->vgId; - pVnode->pWrapper = pVnodeCfg->pWrapper; + pVnode->msgCb = pVnodeCfg->msgCb; pVnode->pTfs = pVnodeCfg->pTfs; pVnode->path = strdup(path); vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); diff --git a/source/dnode/vnode/src/vnd/vnodeMgr.c b/source/dnode/vnode/src/vnd/vnodeMgr.c index 51aaf9e68f9f1209af184cc724d59604ace3add4..920b5b0947f93b1aaf04b8085b0571dd14a2d0e3 100644 --- a/source/dnode/vnode/src/vnd/vnodeMgr.c +++ b/source/dnode/vnode/src/vnd/vnodeMgr.c @@ -14,43 +14,33 @@ */ #include "vnd.h" +#include "tglobal.h" SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED}; static void* loop(void* arg); -int vnodeInit(const SVnodeOpt *pOption) { +int vnodeInit() { if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) { return 0; } vnodeMgr.stop = false; - vnodeMgr.putToQueryQFp = pOption->putToQueryQFp; - vnodeMgr.putToFetchQFp = pOption->putToFetchQFp; - vnodeMgr.sendReqFp = pOption->sendReqFp; - vnodeMgr.sendMnodeReqFp = pOption->sendMnodeReqFp; - vnodeMgr.sendRspFp = pOption->sendRspFp; // Start commit handers - if (pOption->nthreads > 0) { - vnodeMgr.nthreads = pOption->nthreads; - vnodeMgr.threads = (TdThread*)calloc(pOption->nthreads, sizeof(TdThread)); - if (vnodeMgr.threads == NULL) { - return -1; - } + vnodeMgr.nthreads = tsNumOfCommitThreads; + vnodeMgr.threads = calloc(vnodeMgr.nthreads, sizeof(TdThread)); + if (vnodeMgr.threads == NULL) { + return -1; + } - taosThreadMutexInit(&(vnodeMgr.mutex), NULL); - taosThreadCondInit(&(vnodeMgr.hasTask), NULL); - TD_DLIST_INIT(&(vnodeMgr.queue)); + taosThreadMutexInit(&(vnodeMgr.mutex), NULL); + taosThreadCondInit(&(vnodeMgr.hasTask), NULL); + TD_DLIST_INIT(&(vnodeMgr.queue)); - for (uint16_t i = 0; i < pOption->nthreads; i++) { - taosThreadCreate(&(vnodeMgr.threads[i]), NULL, loop, NULL); - // pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread"); - } - } else { - // TODO: if no commit thread is set, then another mechanism should be - // given. Otherwise, it is a false. - ASSERT(0); + for (uint16_t i = 0; i < vnodeMgr.nthreads; i++) { + taosThreadCreate(&(vnodeMgr.threads[i]), NULL, loop, NULL); + // pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread"); } if (walInit() < 0) { @@ -92,26 +82,6 @@ int vnodeScheduleTask(SVnodeTask* pTask) { return 0; } -int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { - return (*vnodeMgr.putToQueryQFp)(pVnode->pWrapper, pReq); -} - -int32_t vnodePutToVFetchQ(SVnode* pVnode, struct SRpcMsg* pReq) { - return (*vnodeMgr.putToFetchQFp)(pVnode->pWrapper, pReq); -} - -int32_t vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) { - return (*vnodeMgr.sendReqFp)(pVnode->pWrapper, epSet, pReq); -} - -int32_t vnodeSendMnodeReq(SVnode* pVnode, struct SRpcMsg* pReq) { - return (*vnodeMgr.sendMnodeReqFp)(pVnode->pWrapper, pReq); -} - -void vnodeSendRsp(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pRsp) { - (*vnodeMgr.sendRspFp)(pVnode->pWrapper, pRsp); -} - /* ------------------------ STATIC METHODS ------------------------ */ static void* loop(void* arg) { setThreadName("vnode-commit"); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 3132c675e9c576264d25e55dcdcc2da76b6655e1..d7af04c462d8068a33ac83b3760a585dae55b3d5 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -20,8 +20,7 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg); static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg); int vnodeQueryOpen(SVnode *pVnode) { - return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, pVnode, - (putReqToQueryQFp)vnodePutToVQueryQ, (sendReqFp)vnodeSendReq); + return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, &pVnode->msgCb); } void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index f806a7b2125491bf2f7beb6f2242200d43e4c0ae..84c7f9194f90b57ef0036eae0527e2ebfa673083 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -20,6 +20,7 @@ extern "C" { #endif +#include "qworker.h" #include "tlockfree.h" #include "ttimer.h" @@ -139,18 +140,16 @@ typedef struct SQWSchStatus { // Qnode/Vnode level task management typedef struct SQWorkerMgmt { - SQWorkerCfg cfg; - int8_t nodeType; - int32_t nodeId; - void *timer; - tmr_h hbTimer; - SRWLatch schLock; - //SRWLatch ctxLock; - SHashObj *schHash; //key: schedulerId, value: SQWSchStatus - SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx - void *nodeObj; - putReqToQueryQFp putToQueueFp; - sendReqFp sendReqFp; + SQWorkerCfg cfg; + int8_t nodeType; + int32_t nodeId; + void *timer; + tmr_h hbTimer; + SRWLatch schLock; + // SRWLatch ctxLock; + SHashObj *schHash; // key: schedulerId, value: SQWSchStatus + SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx + SMsgCb msgCb; } SQWorkerMgmt; #define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 73611a5ed9480eef852f01044b510d9015bf0e78..55dbb893d622df387027a1142011928accfc70c8 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1442,9 +1442,8 @@ _return: taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); } -int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, - putReqToQueryQFp fp1, sendReqFp fp2) { - if (NULL == qWorkerMgmt || NULL == nodeObj || NULL == fp1 || NULL == fp2) { +int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) { + if (NULL == qWorkerMgmt || pMsgCb->pWrapper == NULL) { qError("invalid param to init qworker"); QW_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -1500,9 +1499,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW mgmt->nodeType = nodeType; mgmt->nodeId = nodeId; - mgmt->nodeObj = nodeObj; - mgmt->putToQueueFp = fp1; - mgmt->sendReqFp = fp2; + mgmt->msgCb = *pMsgCb; *qWorkerMgmt = mgmt; diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 7d633d1c731480122e3b737ea1c18d336723de2e..25107430a73aed14102a9dce55b5aa6cde0d3f5a 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -245,15 +245,15 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection) { req->taskId = tId; SRpcMsg pNewMsg = { - .handle = pMsg->handle, - .ahandle = pMsg->ahandle, - .msgType = TDMT_VND_QUERY_CONTINUE, - .pCont = req, - .contLen = sizeof(SQueryContinueReq), - .code = 0, + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .msgType = TDMT_VND_QUERY_CONTINUE, + .pCont = req, + .contLen = sizeof(SQueryContinueReq), + .code = 0, }; - int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg); + int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg); if (TSDB_CODE_SUCCESS != code) { QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code)); rpcFreeCont(req); diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index a45ec41e6c0e3b5ff7d4d4eac987dc2315934059..30c0e2fca21a4233885890f479eb1bdee5836e9d 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -1080,7 +1080,10 @@ TEST(rcTest, shortExecshortDelay) { qwtTestStop = false; qwtTestQuitThreadNum = 0; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL); + SMsgCb msgCb = {0}; + msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; + msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 0; @@ -1161,7 +1164,10 @@ TEST(rcTest, longExecshortDelay) { qwtTestStop = false; qwtTestQuitThreadNum = 0; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL); + SMsgCb msgCb = {0}; + msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; + msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 1000000; @@ -1244,7 +1250,10 @@ TEST(rcTest, shortExeclongDelay) { qwtTestStop = false; qwtTestQuitThreadNum = 0; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL); + SMsgCb msgCb = {0}; + msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; + msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 0;