diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index b83ca53859ad446e501c5b2d031affb51d0dadd8..03a59b01e67cc97de84db85989abe85bf8d8f89a 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -31,8 +31,12 @@ extern "C" { /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SVnode SVnode; +typedef struct SDnode SDnode; +typedef void (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); + typedef struct SVnodeCfg { int32_t vgId; + SDnode *pDnode; /** vnode buffer pool options */ struct { @@ -66,16 +70,13 @@ typedef struct SVnodeCfg { SWalCfg walCfg; } SVnodeCfg; -typedef struct SDnode SDnode; -typedef void (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); typedef struct { int32_t sver; - SDnode *pDnode; char *timezone; char *locale; char *charset; - PutReqToVQueryQFp putReqToVQueryQFp; uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) + PutReqToVQueryQFp putReqToVQueryQFp; } SVnodeOpt; /* ------------------------ SVnode ------------------------ */ @@ -100,7 +101,7 @@ void vnodeClear(); * @param pVnodeCfg options of the vnode * @return SVnode* The vnode object */ -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid); +SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); /** * @brief Close a VNODE diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index bf27a542aedd4722d9c17c7f9e7441c546b554dc..9f585859a8622a695cb99ac07fc36c57aa3048f5 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -381,7 +381,8 @@ static void *dnodeOpenVnodeFunc(void *param) { pMgmt->openVnodes, pMgmt->totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); - SVnode *pImpl = vnodeOpen(pCfg->path, NULL, pCfg->vgId); + SVnodeCfg cfg = {.pDnode = pDnode, .vgId = pCfg->vgId}; + SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; @@ -581,7 +582,8 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/, pCreate->vgId); + vnodeCfg.pDnode = pDnode; + SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); if (pImpl == NULL) { dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr()); return -1; diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index f3016feda5c7a9bb0a95272eb9c9758e17981ffd..ea42db96ab56aef1a2a1d615187bbb225143a214 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -200,12 +200,11 @@ SDnode *dndInit(SDnodeOpt *pOption) { SVnodeOpt vnodeOpt = { .sver = pDnode->opt.sver, - .pDnode = pDnode, .timezone = pDnode->opt.timezone, .locale = pDnode->opt.locale, .charset = pDnode->opt.charset, - .putReqToVQueryQFp = dndPutMsgToVQueryQ, .nthreads = pDnode->opt.numOfCommitThreads, + .putReqToVQueryQFp = dndPutMsgToVQueryQ, }; if (vnodeInit(&vnodeOpt) != 0) { dError("failed to init vnode env"); diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index 361fdd10e0cc4bb320ab57a890721d8dddbe4f22..4f53dcd899414e9a7740f87cb62c1df13b3451b2 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -77,11 +77,12 @@ struct SVnode { SVnodeFS* pFs; tsem_t canCommit; SQHandle* pQuery; + SDnode* pDnode; }; int vnodeScheduleTask(SVnodeTask* task); -void vnodePutReqToVQueryQ(struct SRpcMsg *pReq); +void vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq); #ifdef __cplusplus } diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c index eb4b45bc20f010e7b2b9ca73e69e3eea82461766..85ccc9879ec4471c85e9df06fd69db4055c7580c 100644 --- a/source/dnode/vnode/impl/src/vnodeMain.c +++ b/source/dnode/vnode/impl/src/vnodeMain.c @@ -15,27 +15,29 @@ #include "vnodeDef.h" -static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid); +static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg); static void vnodeFree(SVnode *pVnode); static int vnodeOpenImpl(SVnode *pVnode); static void vnodeCloseImpl(SVnode *pVnode); -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) { +SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *pVnode = NULL; // Set default options - //if (pVnodeCfg == NULL) { - pVnodeCfg = &defaultVnodeOptions; - //} + SVnodeCfg cfg = defaultVnodeOptions; + if (pVnodeCfg != NULL) { + cfg.vgId = pVnodeCfg->vgId; + cfg.pDnode = pVnodeCfg->pDnode; + } // Validate options - if (vnodeValidateOptions(pVnodeCfg) < 0) { + if (vnodeValidateOptions(&cfg) < 0) { // TODO return NULL; } // Create the handle - pVnode = vnodeNew(path, pVnodeCfg, vid); + pVnode = vnodeNew(path, &cfg); if (pVnode == NULL) { // TODO: handle error return NULL; @@ -62,7 +64,7 @@ void vnodeClose(SVnode *pVnode) { void vnodeDestroy(const char *path) { taosRemoveDir(path); } /* ------------------------ STATIC METHODS ------------------------ */ -static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) { +static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *pVnode = NULL; pVnode = (SVnode *)calloc(1, sizeof(*pVnode)); @@ -71,7 +73,8 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vi return NULL; } - pVnode->vgId = vid; + pVnode->vgId = pVnodeCfg->vgId; + pVnode->pDnode = pVnodeCfg->pDnode; pVnode->path = strdup(path); vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); diff --git a/source/dnode/vnode/impl/src/vnodeMgr.c b/source/dnode/vnode/impl/src/vnodeMgr.c index 43527c13feba632d11d866b3b1e92f914f0a7009..51f33031ac7a12850d24875afc89a9870ed48ee6 100644 --- a/source/dnode/vnode/impl/src/vnodeMgr.c +++ b/source/dnode/vnode/impl/src/vnodeMgr.c @@ -26,7 +26,6 @@ int vnodeInit(const SVnodeOpt *pOption) { vnodeMgr.stop = false; vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp; - vnodeMgr.pDnode = pOption->pDnode; // Start commit handers if (pOption->nthreads > 0) { @@ -91,10 +90,10 @@ int vnodeScheduleTask(SVnodeTask* pTask) { return 0; } -void vnodePutReqToVQueryQ(struct SRpcMsg* pReq) { - if (vnodeMgr.putReqToVQueryQFp) { - (*vnodeMgr.putReqToVQueryQFp)(vnodeMgr.pDnode, pReq); - } +void vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { + assert(vnodeMgr.putReqToVQueryQFp); + assert(pVnode->pDnode); + (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq); } /* ------------------------ STATIC METHODS ------------------------ */