diff --git a/include/dnode/bnode/bnode.h b/include/dnode/bnode/bnode.h index 7f7b8a8d53f1fa249704d6e5510273ae4247da4a..7d47a5ddf7e832683d03a0fb5a9a381af5e18ed3 100644 --- a/include/dnode/bnode/bnode.h +++ b/include/dnode/bnode/bnode.h @@ -36,7 +36,7 @@ typedef struct { int32_t dnodeId; int64_t clusterId; SDnode *pDnode; - SendReqToDnodeFp sendReqToDnodeFp; + SendReqToDnodeFp sendReqFp; SendReqToMnodeFp sendReqToMnodeFp; SendRedirectRspFp sendRedirectRspFp; } SBnodeOpt; diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 8b90a006d36f2d6417e9d12b16c78a5a0e162bce..1b6938b7c5997b7376ce70bdf66f78acb28f29de 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -40,7 +40,7 @@ typedef struct { SDnode *pDnode; PutReqToMWriteQFp putReqToMWriteQFp; PutReqToMReadQFp putReqToMReadQFp; - SendReqToDnodeFp sendReqToDnodeFp; + SendReqToDnodeFp sendReqFp; SendReqToMnodeFp sendReqToMnodeFp; SendRedirectRspFp sendRedirectRspFp; } SMnodeOpt; diff --git a/include/dnode/qnode/qnode.h b/include/dnode/qnode/qnode.h index 3de2986047c4b5bf1e047e294b46999149abd433..1ab27bb1ebe04376bce1d297eeb78c86edf2553e 100644 --- a/include/dnode/qnode/qnode.h +++ b/include/dnode/qnode/qnode.h @@ -43,7 +43,7 @@ typedef struct { int32_t dnodeId; int64_t clusterId; SDnode *pDnode; - SendReqToDnodeFp sendReqToDnodeFp; + SendReqToDnodeFp sendReqFp; SendReqToMnodeFp sendReqToMnodeFp; SendRedirectRspFp sendRedirectRspFp; } SQnodeOpt; diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index 21a93532e0d77b3c8f5e6157380e7804b393d622..0e82b8cbf5bc721068288d354f80c2d424dcac39 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -39,7 +39,7 @@ typedef struct { int32_t dnodeId; int64_t clusterId; SDnode *pDnode; - SendReqToDnodeFp sendReqToDnodeFp; + SendReqToDnodeFp sendReqFp; SendReqToMnodeFp sendReqToMnodeFp; SendRedirectRspFp sendRedirectRspFp; } SSnodeOpt; diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index dd3103edf1baf4f161f7dd7781934c818a919059..8954cbaa0ec1f583b0524413ce9e73eb189e0d16 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -49,10 +49,10 @@ typedef struct { } SQWorkerStat; typedef int32_t (*putReqToQueryQFp)(void *, struct SRpcMsg *); -typedef int32_t (*sendReqToDnodeFp)(void *, struct SEpSet *, struct SRpcMsg *); +typedef int32_t (*sendReqFp)(void *, struct SEpSet *, struct SRpcMsg *); int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, - putReqToQueryQFp fp1, sendReqToDnodeFp fp2); + putReqToQueryQFp fp1, sendReqFp fp2); int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); diff --git a/include/libs/tfs/tfs.h b/include/libs/tfs/tfs.h index 1b41da33bbbcfa60567837c9a886533b42062b54..efea80a7889b6ff48f5bb8f14332b6a6851ddcc6 100644 --- a/include/libs/tfs/tfs.h +++ b/include/libs/tfs/tfs.h @@ -244,7 +244,7 @@ void tfsClosedir(STfsDir *pDir); * @param pTfs The fs object. * @param pInfo The info object. */ -int32_t tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo); +void tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/bnode/src/bmMgmt.c b/source/dnode/mgmt/bnode/src/bmMgmt.c index 6c86850d3bbe5221832e9261855a054069f5085f..90b6d06564ca17df00b07276a90b59c6ad784574 100644 --- a/source/dnode/mgmt/bnode/src/bmMgmt.c +++ b/source/dnode/mgmt/bnode/src/bmMgmt.c @@ -177,7 +177,7 @@ static void dndStopBnodeWorker(SDnode *pDnode) { static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) { pOption->pDnode = pDnode; - pOption->sendReqToDnodeFp = dndSendReqToDnode; + pOption->sendReqFp = dndSendReqToDnode; pOption->sendReqToMnodeFp = dndSendReqToMnode; pOption->sendRedirectRspFp = dmSendRedirectRsp; pOption->dnodeId = dmGetDnodeId(pDnode); diff --git a/source/dnode/mgmt/container/src/dndMonitor.c b/source/dnode/mgmt/container/src/dndMonitor.c index 8b7ff389f6a1fefdb0d6e29afc5ccd17adf3601c..a25c1a299be6ead20ab1ee414357b2462d790da2 100644 --- a/source/dnode/mgmt/container/src/dndMonitor.c +++ b/source/dnode/mgmt/container/src/dndMonitor.c @@ -25,7 +25,9 @@ static int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) { tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name)); pInfo->tempdir.size = tsTempSpace.size; - return vmGetTfsMonitorInfo(dndGetWrapper(pDnode, VNODES), pInfo); + vmGetTfsMonitorInfo(dndGetWrapper(pDnode, VNODES), pInfo); + + return 0; } static void dndGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { @@ -48,7 +50,7 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { taosGetCardInfo(&pInfo->net_in, &pInfo->net_out); taosGetProcIO(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk); - vmGetVndMonitorInfo(dndGetWrapper(pDnode, VNODES), pInfo); + vmGetVnodeReqs(dndGetWrapper(pDnode, VNODES), pInfo); pInfo->has_mnode = (dndGetWrapper(pDnode, MNODE)->required); } diff --git a/source/dnode/mgmt/dnode/src/dmMsg.c b/source/dnode/mgmt/dnode/src/dmMsg.c index 28d7eb0d29b6a19ec772f6d87b54ade24cda0f4b..d2541f74e2f795a72ca08c6b3cbcceb546f52b64 100644 --- a/source/dnode/mgmt/dnode/src/dmMsg.c +++ b/source/dnode/mgmt/dnode/src/dmMsg.c @@ -44,7 +44,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { taosRUnLockLatch(&pMgmt->latch); req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad)); - vmGetVnodeLoads(pDnode, req.pVloads); + vmGetVnodeLoads(dndGetWrapper(pDnode, VNODES), req.pVloads); int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); void *pHead = rpcMallocCont(contLen); diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index e0daf5260a5289bc3b8d9179d6a3e1c44f1fcc48..e3c32225d58788e6a65e58e24a4102edcae9e610 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -99,21 +99,6 @@ static void dmProcessQueue(SDnode *pDnode, SNodeMsg *pNodeMsg) { case TDMT_MND_GRANT_RSP: dmProcessGrantRsp(pDnode, pMsg); break; - case TDMT_DND_CREATE_VNODE: - code = vmProcessCreateVnodeReq(pDnode, pMsg); - break; - case TDMT_DND_ALTER_VNODE: - code = vmProcessAlterVnodeReq(pDnode, pMsg); - break; - case TDMT_DND_DROP_VNODE: - code = vmProcessDropVnodeReq(pDnode, pMsg); - break; - case TDMT_DND_SYNC_VNODE: - code = vmProcessSyncVnodeReq(pDnode, pMsg); - break; - case TDMT_DND_COMPACT_VNODE: - code = vmProcessCompactVnodeReq(pDnode, pMsg); - break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; code = -1; diff --git a/source/dnode/mgmt/mnode/inc/mmMsg.h b/source/dnode/mgmt/mnode/inc/mmMsg.h index fdafff901fb9712403aaabdfdbfe16d2788bf3d8..f07c44705bc6f2e43f6f4f86dbc44db138217596 100644 --- a/source/dnode/mgmt/mnode/inc/mmMsg.h +++ b/source/dnode/mgmt/mnode/inc/mmMsg.h @@ -22,7 +22,8 @@ extern "C" { #endif -void mmInitMsgHandles(SMgmtWrapper *pWrapper); +void mmInitMsgHandles(SMgmtWrapper *pWrapper); + int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); diff --git a/source/dnode/mgmt/mnode/src/mmInt.c b/source/dnode/mgmt/mnode/src/mmInt.c index ff829361510022de9446f1bac1c207975f75440e..36f4b40fa814aed03da72be26f873d32890463c2 100644 --- a/source/dnode/mgmt/mnode/src/mmInt.c +++ b/source/dnode/mgmt/mnode/src/mmInt.c @@ -140,7 +140,7 @@ static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { SDnode *pDnode = pMgmt->pDnode; pOption->pDnode = pDnode; - pOption->sendReqToDnodeFp = dndSendReqToDnode; + pOption->sendReqFp = dndSendReqToDnode; pOption->sendReqToMnodeFp = dndSendReqToMnode; pOption->sendRedirectRspFp = dmSendRedirectRsp; pOption->putReqToMWriteQFp = mmPutMsgToWriteQueue; diff --git a/source/dnode/mgmt/qnode/src/qmMgmt.c b/source/dnode/mgmt/qnode/src/qmMgmt.c index b5147cdf938a91c360e972a07b784c2e256cbb42..c29704582ae1904bd8fb5d5f97787d772da0e7c9 100644 --- a/source/dnode/mgmt/qnode/src/qmMgmt.c +++ b/source/dnode/mgmt/qnode/src/qmMgmt.c @@ -183,7 +183,7 @@ static void dndStopQnodeWorker(SDnode *pDnode) { static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) { pOption->pDnode = pDnode; - pOption->sendReqToDnodeFp = dndSendReqToDnode; + pOption->sendReqFp = dndSendReqToDnode; pOption->sendReqToMnodeFp = dndSendReqToMnode; pOption->sendRedirectRspFp = dmSendRedirectRsp; pOption->dnodeId = dmGetDnodeId(pDnode); diff --git a/source/dnode/mgmt/snode/src/smMgmt.c b/source/dnode/mgmt/snode/src/smMgmt.c index 40e7a7ca93a0f0435a6e819152dd57e9f4535989..12468085b1c37e9e0dae6fb8ac5d2c227e00c4a0 100644 --- a/source/dnode/mgmt/snode/src/smMgmt.c +++ b/source/dnode/mgmt/snode/src/smMgmt.c @@ -208,7 +208,7 @@ static void dndStopSnodeWorker(SDnode *pDnode) { static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) { pOption->pDnode = pDnode; - pOption->sendReqToDnodeFp = dndSendReqToDnode; + pOption->sendReqFp = dndSendReqToDnode; pOption->sendReqToMnodeFp = dndSendReqToMnode; pOption->sendRedirectRspFp = dmSendRedirectRsp; pOption->dnodeId = dmGetDnodeId(pDnode); diff --git a/source/dnode/mgmt/vnode/inc/vmFile.h b/source/dnode/mgmt/vnode/inc/vmFile.h index 677ba14c074cb1bee6d14eca4f287ee0529f3cea..585eafaf21ecee964e550c74210d3b8c2313b73c 100644 --- a/source/dnode/mgmt/vnode/inc/vmFile.h +++ b/source/dnode/mgmt/vnode/inc/vmFile.h @@ -22,8 +22,9 @@ extern "C" { #endif -int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); -int32_t vmWriteVnodesToFile(SVnodesMgmt *pMgmt); +int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); +int32_t vmWriteVnodesToFile(SVnodesMgmt *pMgmt); +SVnodeObj **vmGetVnodesFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index aedece54376dd19db0eb4ebffd0d38bc012c8ef4..a3f1b3ad1f45a05ad0b426680b00b2e53ade9690 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -86,6 +86,9 @@ typedef struct { // interface void vmGetMgmtFp(SMgmtWrapper *pWrapper); +void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads); +void vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo); +void vmGetVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo); // vmInt.h SVnodeObj *vmAcquireVnode(SVnodesMgmt *pMgmt, int32_t vgId); @@ -93,23 +96,11 @@ void vmReleaseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl); void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); -int32_t dndInitVnodes(SDnode *pDnode); -void dndCleanupVnodes(SDnode *pDnode); -void vmGetVnodeLoads(SDnode *pDnode, SArray *pLoads); void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t vmProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq); -int32_t vmProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq); -int32_t vmProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq); -int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq); -int32_t vmProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq); -int32_t vmProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq); - -int32_t vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo); -void vmGetVndMonitorInfo(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/vnode/inc/vmMsg.h b/source/dnode/mgmt/vnode/inc/vmMsg.h index 168c4ca74c8060e77d367bb00eab4579b846dbe7..b17d2b24b93f4a371d004d4d796f204dc05ddddd 100644 --- a/source/dnode/mgmt/vnode/inc/vmMsg.h +++ b/source/dnode/mgmt/vnode/inc/vmMsg.h @@ -22,11 +22,11 @@ extern "C" { #endif -void vmInitMsgHandles(SMgmtWrapper *pWrapper); +void vmInitMsgHandles(SMgmtWrapper *pWrapper); + int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq); int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq); int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq); -int32_t dndProcessAuthVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq); int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq); int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq); diff --git a/source/dnode/mgmt/vnode/inc/vmWorker.h b/source/dnode/mgmt/vnode/inc/vmWorker.h index c921bdffb88d15947a1545170267c1ca4970d3ff..005615c23e7c0e62699e562ee6d4e17f0f4afa1b 100644 --- a/source/dnode/mgmt/vnode/inc/vmWorker.h +++ b/source/dnode/mgmt/vnode/inc/vmWorker.h @@ -24,13 +24,11 @@ extern "C" { int32_t vmStartWorker(SVnodesMgmt *pMgmt); void vmStopWorker(SVnodesMgmt *pMgmt); +int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); +void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); -void vmInitMsgFp(SMnodeMgmt *pMgmt); -void vmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t vmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t vmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -void vmConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); -void vmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); +int32_t vmPutMsgToQueryQueue(SVnodesMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutMsgToApplyQueue(SVnodesMgmt *pMgmt, int32_t vgId, SRpcMsg *pMsg); int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/vnode/src/vmFile.c b/source/dnode/mgmt/vnode/src/vmFile.c index 5b903ef0084468f9537fa06bd41761986c830329..fb3b00b081de853ce19d450c328d7873698e5121 100644 --- a/source/dnode/mgmt/vnode/src/vmFile.c +++ b/source/dnode/mgmt/vnode/src/vmFile.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "vmFile.h" -static SVnodeObj **vmGetVnodesFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) { +SVnodeObj **vmGetVnodesFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) { taosRLockLatch(&pMgmt->latch); int32_t num = 0; diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index 48fe6359de2cefd5ca9c2a34ba3e63a1113c7c5a..c591a0f4dd9c4abbaa3109cccadb5bb29c1357be 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -17,6 +17,7 @@ #include "vmFile.h" #include "vmMsg.h" #include "vmWorker.h" +#include "sync.h" SVnodeObj *vmAcquireVnode(SVnodesMgmt *pMgmt, int32_t vgId) { SVnodeObj *pVnode = NULL; @@ -41,7 +42,6 @@ SVnodeObj *vmAcquireVnode(SVnodesMgmt *pMgmt, int32_t vgId) { void vmReleaseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { if (pVnode == NULL) return; - SVnodesMgmt *pMgmt = &pDnode->vmgmt; taosRLockLatch(&pMgmt->latch); int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); taosRUnLockLatch(&pMgmt->latch); @@ -70,7 +70,7 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { return -1; } - if (dndAllocVnodeQueue(pDnode, pVnode) != 0) { + if (vmAllocQueue(pMgmt, pVnode) != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -86,7 +86,6 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { } void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; taosWLockLatch(&pMgmt->latch); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); taosWUnLockLatch(&pMgmt->latch); @@ -99,7 +98,7 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); - dndFreeVnodeQueue(pDnode, pVnode); + vmFreeQueue(pMgmt, pVnode); vnodeClose(pVnode->pImpl); pVnode->pImpl = NULL; @@ -131,13 +130,13 @@ static void *vmOpenVnodeFunc(void *param) { pMgmt->state.openVnodes, pMgmt->state.totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); - SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; + SVnodeCfg cfg = {.pMgmt = pMgmt, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; } else { - vmOpenVnode(pDnode, pCfg, pImpl); + vmOpenVnode(pMgmt, pCfg, pImpl); dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->opened++; } @@ -163,7 +162,7 @@ static int32_t vmOpenVnodes(SVnodesMgmt *pMgmt) { SWrapperCfg *pCfgs = NULL; int32_t numOfVnodes = 0; - if (vmGetVnodesFromFile(pDnode, &pCfgs, &numOfVnodes) != 0) { + if (vmGetVnodesFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) { dInfo("failed to get vnode list from disk since %s", terrstr()); return -1; } @@ -229,10 +228,10 @@ static void vmCloseVnodes(SVnodesMgmt *pMgmt) { dInfo("start to close all vnodes"); int32_t numOfVnodes = 0; - SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes); + SVnodeObj **pVnodes = vmGetVnodesFromHash(pMgmt, &numOfVnodes); for (int32_t i = 0; i < numOfVnodes; ++i) { - vmCloseVnode(pDnode, pVnodes[i]); + vmCloseVnode(pMgmt, pVnodes[i]); } if (pVnodes != NULL) { @@ -261,6 +260,10 @@ static void vmCleanup(SMgmtWrapper *pWrapper) { dInfo("vnodes-mgmt is cleaned up"); } +static int32_t vmSendReqToDnode(SVnodesMgmt *pMgmt, struct SEpSet *epSet, struct SRpcMsg *rpcMsg) { + return dndSendReqToDnode(pMgmt->pDnode, epSet, rpcMsg); +} + static int32_t vmInit(SMgmtWrapper *pWrapper) { SDnode *pDnode = pWrapper->pDnode; SVnodesMgmt *pMgmt = calloc(1, sizeof(SVnodesMgmt)); @@ -298,8 +301,8 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { } vnodeOpt.nthreads = tsNumOfCommitThreads; - vnodeOpt.putReqToVQueryQFp = dndPutReqToVQueryQ; - vnodeOpt.sendReqToDnodeFp = dndSendReqToDnode; + vnodeOpt.putToQueryQFp = (VndPutToQueryQFp)vmPutMsgToQueryQueue; + vnodeOpt.sendReqFp = (VndSendReqFp)vmSendReqToDnode; if (vnodeInit(&vnodeOpt) != 0) { dError("failed to init vnode since %s", terrstr()); goto _OVER; @@ -339,15 +342,14 @@ void vmGetMgmtFp(SMgmtWrapper *pWrapper) { pWrapper->fp = mgmtFp; } -int32_t vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo) { +void vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; - if (pMgmt == NULL) return -1; + if (pMgmt == NULL) return; - return tfsGetMonitorInfo(pMgmt->pTfs, pInfo); - ; + tfsGetMonitorInfo(pMgmt->pTfs, pInfo); } -void vmGetVndMonitorInfo(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo) { +void vmGetVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; if (pMgmt == NULL) return; @@ -362,9 +364,9 @@ void vmGetVndMonitorInfo(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo) { pInfo->masters = pStat->masterNum; } -void vmGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SVnodesStat *pStat = &pMgmt->stat; +void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads) { + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + SVnodesStat *pStat = &pMgmt->state; int32_t totalVnodes = 0; int32_t masterNum = 0; int64_t numOfSelectReqs = 0; diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 73506503fd7995f4af28d4ec345e8d90249a25e0..1251fdbc7e23ed5488a08263dab4713a2ddccf8c 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -17,6 +17,7 @@ #include "vmMsg.h" #include "vmFile.h" #include "vmWorker.h" +#include "dmInt.h" static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->vgId = pCreate->vgId; @@ -83,7 +84,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) { return -1; } - vnodeCfg.pDnode = pMgmt->pDnode; + vnodeCfg.pMgmt = pMgmt; vnodeCfg.pTfs = pMgmt->pTfs; vnodeCfg.dbId = wrapperCfg.dbUid; SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); @@ -262,4 +263,10 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg); + + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessCreateVnodeReq); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, vmProcessAlterVnodeReq); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessDropVnodeReq); + dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, vmProcessSyncVnodeReq); + dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, vmProcessCompactVnodeReq); } diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index b1a3adb71f0ed949ba2d58108f4f822fab39b8d0..0ae72b6479471fcf2e28541509f9279239b63e3a 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -16,11 +16,11 @@ #define _DEFAULT_SOURCE #include "vmWorker.h" -static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessQueryMsg(pVnode->pImpl, pMsg); } +static void vmProcessQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessQueryMsg(pVnode->pImpl, pMsg); } -static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessFetchMsg(pVnode->pImpl, pMsg); } +static void vmProcessFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessFetchMsg(pVnode->pImpl, pMsg); } -static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { +static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); for (int32_t i = 0; i < numOfMsgs; ++i) { @@ -56,7 +56,7 @@ static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_ taosArrayDestroy(pArray); } -static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { +static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { @@ -68,7 +68,7 @@ static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_ } } -static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { +static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { @@ -80,7 +80,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t } } -static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg, bool sendRsp) { +static int32_t vmWriteMsgToQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg, bool sendRsp) { int32_t code = 0; if (pQueue == NULL) { @@ -108,7 +108,7 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg, return code; } -static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { +static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SRpcMsg *pMsg) { SMsgHead *pHead = pMsg->pCont; pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); @@ -126,51 +126,55 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { return pVnode; } -void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); - if (pVnode != NULL) { - (void)dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg, true); - vmReleaseVnode(pMgmt, pVnode); - } +int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { +// SVnodeObj *pVnode = vmAcquireFromMsg(pDnode, pMsg); +// if (pVnode != NULL) { +// (void)vmWriteMsgToQueue(pVnode->pWriteQ, pMsg, true); +// vmReleaseVnode(pMgmt, pVnode); +// } +return 0; } -void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); - if (pVnode != NULL) { - (void)dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg, true); - vmReleaseVnode(pMgmt, pVnode); - } +int32_t vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { +// SVnodeObj *pVnode = vmAcquireFromMsg(pDnode, pMsg); +// if (pVnode != NULL) { +// (void)vmWriteMsgToQueue(pVnode->pSyncQ, pMsg, true); +// vmReleaseVnode(pMgmt, pVnode); +// } +return 0; } -void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); - if (pVnode != NULL) { - (void)dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, true); - vmReleaseVnode(pMgmt, pVnode); - } +int32_t vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { +// SVnodeObj *pVnode = vmAcquireFromMsg(pDnode, pMsg); +// if (pVnode != NULL) { +// (void)vmWriteMsgToQueue(pVnode->pQueryQ, pMsg, true); +// vmReleaseVnode(pMgmt, pVnode); +// } +return 0; } -void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); - if (pVnode != NULL) { - (void)dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, true); - vmReleaseVnode(pMgmt, pVnode); - } +int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){ +// SVnodeObj *pVnode = vmAcquireFromMsg(pDnode, pMsg); +// if (pVnode != NULL) { +// (void)vmWriteMsgToQueue(pVnode->pFetchQ, pMsg, true); +// vmReleaseVnode(pMgmt, pVnode); +// } +return 0; } -int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pMsg) { +int32_t vmPutMsgToQueryQueue(SVnodesMgmt *pMgmt, SRpcMsg *pMsg) { SMsgHead *pHead = pMsg->pCont; // pHead->vgId = htonl(pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) return -1; - int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, false); + int32_t code = vmWriteMsgToQueue(pVnode->pQueryQ, pMsg, false); vmReleaseVnode(pMgmt, pVnode); return code; } -static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) { +int32_t vmPutMsgToApplyQueue(SVnodesMgmt *pMgmt, int32_t vgId, SRpcMsg *pMsg) { SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId); if (pVnode == NULL) return -1; @@ -179,14 +183,12 @@ static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMs return code; } -static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - - pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeWriteQueue); - pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeApplyQueue); - pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)dndProcessVnodeSyncQueue); - pVnode->pFetchQ = tFWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)dndProcessVnodeFetchQueue); - pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)dndProcessVnodeQueryQueue); +int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { + pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue); + pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); + pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); + pVnode->pFetchQ = tFWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); + pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL || pVnode->pQueryQ == NULL) { @@ -197,8 +199,7 @@ static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { return 0; } -static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; +void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tFWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 1da1673b2b671749ee5cdba43bb61818c67e4153..7b0f25d04320f8d0db9e4e8b676043ac92ae8a9f 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -119,7 +119,7 @@ typedef struct SMnode { SHashObj *infosMeta; SGrantInfo grant; MndMsgFp msgFp[TDMT_MAX]; - SendReqToDnodeFp sendReqToDnodeFp; + SendReqToDnodeFp sendReqFp; SendReqToMnodeFp sendReqToMnodeFp; SendRedirectRspFp sendRedirectRspFp; PutReqToMWriteQFp putReqToMWriteQFp; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index ceb86d0a208ab36492a023b6cbc15931ee6b88bc..d5a629a5b0dc9683e1a781aeb9fc1b839f033d53 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -44,16 +44,16 @@ #define TELEM_TIMER_MS 86400000 int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) { - if (pMnode == NULL || pMnode->sendReqToDnodeFp == NULL) { + if (pMnode == NULL || pMnode->sendReqFp == NULL) { terrno = TSDB_CODE_MND_NOT_READY; return -1; } - return (*pMnode->sendReqToDnodeFp)(pMnode->pDnode, pEpSet, pMsg); + return (*pMnode->sendReqFp)(pMnode->pDnode, pEpSet, pMsg); } int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) { - if (pMnode == NULL || pMnode->sendReqToDnodeFp == NULL) { + if (pMnode == NULL || pMnode->sendReqFp == NULL) { terrno = TSDB_CODE_MND_NOT_READY; return -1; } @@ -289,11 +289,11 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { pMnode->pDnode = pOption->pDnode; pMnode->putReqToMWriteQFp = pOption->putReqToMWriteQFp; pMnode->putReqToMReadQFp = pOption->putReqToMReadQFp; - pMnode->sendReqToDnodeFp = pOption->sendReqToDnodeFp; + pMnode->sendReqFp = pOption->sendReqFp; pMnode->sendReqToMnodeFp = pOption->sendReqToMnodeFp; pMnode->sendRedirectRspFp = pOption->sendRedirectRspFp; - if (pMnode->sendReqToDnodeFp == NULL || pMnode->sendReqToMnodeFp == NULL || pMnode->sendRedirectRspFp == NULL || + if (pMnode->sendReqFp == NULL || pMnode->sendReqToMnodeFp == NULL || pMnode->sendRedirectRspFp == NULL || pMnode->putReqToMWriteQFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) { terrno = TSDB_CODE_MND_INVALID_OPTIONS; return -1; diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index a1c3f5b0d42925d86d28a7e21dfd5dcb4cd8ae96..a257b343c26daa6cd6234ef9bf5d072263c6c4da 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -30,7 +30,7 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { } if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, pQnode, - (putReqToQueryQFp)qnodePutReqToVQueryQ, (sendReqToDnodeFp)qnodeSendReqToDnode)) { + (putReqToQueryQFp)qnodePutReqToVQueryQ, (sendReqFp)qnodeSendReqToDnode)) { tfree(pQnode); return NULL; } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index b6dd90a4e2586f44a3865e73ef335be92e778919..fbd1b66e276f882d6720da9c5c280ae45c98cfbb 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -31,9 +31,8 @@ extern "C" { /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SVnode SVnode; -typedef struct SDnode SDnode; -typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); -typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); +typedef int32_t (*VndPutToQueryQFp)(void *pMgmt, struct SRpcMsg *pReq); +typedef int32_t (*VndSendReqFp)(void *pMgmt, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef struct { // TODO @@ -43,7 +42,7 @@ typedef struct { typedef struct { int32_t vgId; uint64_t dbId; - SDnode *pDnode; + void *pMgmt; STfs *pTfs; uint64_t wsize; uint64_t ssize; @@ -63,9 +62,9 @@ typedef struct { } SVnodeCfg; typedef struct { - uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) - PutReqToVQueryQFp putReqToVQueryQFp; - SendReqToDnodeFp sendReqToDnodeFp; + uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) + VndPutToQueryQFp putToQueryQFp; + VndSendReqFp sendReqFp; } SVnodeOpt; typedef struct { diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 6f4f0049e3b728103122cbf71d9b16f37bd88c53..60ba0a3b71b61f0d2826344850377ffa22a44dd4 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -53,9 +53,8 @@ typedef struct SVnodeMgr { pthread_cond_t hasTask; TD_DLIST(SVnodeTask) queue; // For vnode Mgmt - SDnode* pDnode; - PutReqToVQueryQFp putReqToVQueryQFp; - SendReqToDnodeFp sendReqToDnodeFp; + VndPutToQueryQFp putToQueryQFp; + VndSendReqFp sendReqFp; } SVnodeMgr; extern SVnodeMgr vnodeMgr; @@ -79,7 +78,7 @@ struct SVnode { SWal* pWal; tsem_t canCommit; SQHandle* pQuery; - SDnode* pDnode; + void* pMgmt; STfs* pTfs; }; diff --git a/source/dnode/vnode/src/vnd/vnodeMain.c b/source/dnode/vnode/src/vnd/vnodeMain.c index 2a3862c7cbf37de74506bdf217fe62ccdcfd6e52..9388877fdceda9d5d1314736056a622dec8a2aab 100644 --- a/source/dnode/vnode/src/vnd/vnodeMain.c +++ b/source/dnode/vnode/src/vnd/vnodeMain.c @@ -27,7 +27,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { SVnodeCfg cfg = defaultVnodeOptions; if (pVnodeCfg != NULL) { cfg.vgId = pVnodeCfg->vgId; - cfg.pDnode = pVnodeCfg->pDnode; + cfg.pMgmt = pVnodeCfg->pMgmt; cfg.pTfs = pVnodeCfg->pTfs; cfg.dbId = pVnodeCfg->dbId; cfg.hashBegin = pVnodeCfg->hashBegin; @@ -79,7 +79,7 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { } pVnode->vgId = pVnodeCfg->vgId; - pVnode->pDnode = pVnodeCfg->pDnode; + pVnode->pMgmt = pVnodeCfg->pMgmt; pVnode->pTfs = pVnodeCfg->pTfs; pVnode->path = strdup(path); vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); diff --git a/source/dnode/vnode/src/vnd/vnodeMgr.c b/source/dnode/vnode/src/vnd/vnodeMgr.c index 477deed8c8944fc511c8aa3f78951d2131171f40..a01c17488bebff14bff0ec7c63c4b3ed1c1e36a2 100644 --- a/source/dnode/vnode/src/vnd/vnodeMgr.c +++ b/source/dnode/vnode/src/vnd/vnodeMgr.c @@ -25,8 +25,8 @@ int vnodeInit(const SVnodeOpt *pOption) { } vnodeMgr.stop = false; - vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp; - vnodeMgr.sendReqToDnodeFp = pOption->sendReqToDnodeFp; + vnodeMgr.putToQueryQFp = pOption->putToQueryQFp; + vnodeMgr.sendReqFp = pOption->sendReqFp; // Start commit handers if (pOption->nthreads > 0) { @@ -90,15 +90,15 @@ int vnodeScheduleTask(SVnodeTask* pTask) { } int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { - if (pVnode == NULL || pVnode->pDnode == NULL || vnodeMgr.putReqToVQueryQFp == NULL) { + if (pVnode == NULL || pVnode->pMeta == NULL || vnodeMgr.putToQueryQFp == NULL) { terrno = TSDB_CODE_VND_APP_ERROR; return -1; } - return (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq); + return (*vnodeMgr.putToQueryQFp)(pVnode->pMgmt, pReq); } void vnodeSendReqToDnode(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) { - (*vnodeMgr.sendReqToDnodeFp)(pVnode->pDnode, epSet, pReq); + (*vnodeMgr.sendReqFp)(pVnode->pMgmt, epSet, pReq); } /* ------------------------ STATIC METHODS ------------------------ */ diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index e8bc6873ab0a34ed56d1455857be81c26a2ade54..9819c105120f1cbd8fe7d590ff2e57e9b8cc0b5c 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -21,7 +21,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg); int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, pVnode, - (putReqToQueryQFp)vnodePutReqToVQueryQ, (sendReqToDnodeFp)vnodeSendReqToDnode); + (putReqToQueryQFp)vnodePutReqToVQueryQ, (sendReqFp)vnodeSendReqToDnode); } void vnodeQueryClose(SVnode *pVnode) { diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index b5b8726a4c9115541c666ad0c66531700ae885d5..f806a7b2125491bf2f7beb6f2242200d43e4c0ae 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -150,7 +150,7 @@ typedef struct SQWorkerMgmt { SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx void *nodeObj; putReqToQueryQFp putToQueueFp; - sendReqToDnodeFp sendReqFp; + sendReqFp sendReqFp; } SQWorkerMgmt; #define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 42890ab38a09753ae1607d2cc58cfdf9ec080f1f..73611a5ed9480eef852f01044b510d9015bf0e78 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1443,7 +1443,7 @@ _return: } int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, - putReqToQueryQFp fp1, sendReqToDnodeFp fp2) { + putReqToQueryQFp fp1, sendReqFp fp2) { if (NULL == qWorkerMgmt || NULL == nodeObj || NULL == fp1 || NULL == fp2) { qError("invalid param to init qworker"); QW_RET(TSDB_CODE_QRY_INVALID_INPUT); diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index c46989dc5dbeda6e5b774cc817ed7a97713f8087..ee1bfcd1375c909bf9b8b5071912b032e55517ce 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -539,9 +539,9 @@ static STfsDisk *tfsNextDisk(STfs *pTfs, SDiskIter *pIter) { return pDisk; } -int32_t tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo) { +void tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo) { pInfo->datadirs = taosArrayInit(32, sizeof(SMonDiskDesc)); - if (pInfo->datadirs == NULL) return -1; + if (pInfo->datadirs == NULL) return; tfsUpdateSize(pTfs);