diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmInt.c b/source/dnode/mgmt/mgmt_qnode/src/qmInt.c index 6fef75f4b5fb79c23161ad886a37d6555d66182e..5e86a307321f583849de0f6c412f22a4da05a5ad 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmInt.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmInt.c @@ -61,6 +61,12 @@ static int32_t qmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { } tmsgReportStartup("qnode-impl", "initialized"); + if (udfcOpen() != 0) { + dError("qnode can not open udfc"); + qmClose(pMgmt); + return -1; + } + if (qmStartWorker(pMgmt) != 0) { dError("failed to start qnode worker since %s", terrstr()); qmClose(pMgmt); diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 51b38604613d60fc817bbb1a91926a5eb9aca2cc..2ade6c129b88f1eb777dd58d29f74d02c78ded6c 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -25,14 +25,11 @@ extern "C" { #endif -typedef struct SVnodesMgmt { - SHashObj *hash; - SRWLatch latch; - SVnodesStat state; +typedef struct SVnodeMgmt { + SMsgCb msgCb; const char *path; - SDnode *pDnode; - SMgmtWrapper *pWrapper; - STfs *pTfs; + const char *name; + int32_t dnodeId; SQWorkerPool queryPool; SQWorkerPool fetchPool; SWWorkerPool syncPool; @@ -40,7 +37,11 @@ typedef struct SVnodesMgmt { SWWorkerPool mergePool; SSingleWorker mgmtWorker; SSingleWorker monitorWorker; -} SVnodesMgmt; + SHashObj *hash; + SRWLatch latch; + SVnodesStat state; + STfs *pTfs; +} SVnodeMgmt; typedef struct { int32_t vgId; @@ -52,22 +53,21 @@ typedef struct { } SWrapperCfg; typedef struct { - int32_t vgId; - int32_t refCount; - int32_t vgVersion; - int8_t dropped; - int8_t accessState; - uint64_t dbUid; - char *db; - char *path; - SVnode *pImpl; - STaosQueue *pWriteQ; - STaosQueue *pSyncQ; - STaosQueue *pApplyQ; - STaosQueue *pQueryQ; - STaosQueue *pFetchQ; - STaosQueue *pMergeQ; - SMgmtWrapper *pWrapper; + int32_t vgId; + int32_t refCount; + int32_t vgVersion; + int8_t dropped; + int8_t accessState; + uint64_t dbUid; + char *db; + char *path; + SVnode *pImpl; + STaosQueue *pWriteQ; + STaosQueue *pSyncQ; + STaosQueue *pApplyQ; + STaosQueue *pQueryQ; + STaosQueue *pFetchQ; + STaosQueue *pMergeQ; } SVnodeObj; typedef struct { @@ -76,50 +76,49 @@ typedef struct { int32_t failed; int32_t threadIndex; TdThread thread; - SVnodesMgmt *pMgmt; + SVnodeMgmt *pMgmt; SWrapperCfg *pCfgs; } SVnodeThread; // vmInt.c -SVnodeObj *vmAcquireVnode(SVnodesMgmt *pMgmt, int32_t vgId); -void vmReleaseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); -int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl); -void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); +SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId); +void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); +int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl); +void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); // vmHandle.c -void vmInitMsgHandle(SMgmtWrapper *pWrapper); -int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); -int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); -int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); -int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); -void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); +SArray *vmGetMsgHandles(); +int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq); +int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq); +int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq); +int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq); // vmFile.c -int32_t vmGetVnodeListFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); -int32_t vmWriteVnodeListToFile(SVnodesMgmt *pMgmt); -SVnodeObj **vmGetVnodeListFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes); +int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); +int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt); +SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes); // vmWorker.c -int32_t vmStartWorker(SVnodesMgmt *pMgmt); -void vmStopWorker(SVnodesMgmt *pMgmt); -int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); -void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); - -int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -int32_t vmPutMsgToMergeQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype); - -int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrappert, SNodeMsg *pMsg); -int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t vmStartWorker(SVnodeMgmt *pMgmt); +void vmStopWorker(SVnodeMgmt *pMgmt); +int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); +void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); + +int32_t vmPutRpcMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutRpcMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutRpcMsgToApplyQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutRpcMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutRpcMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutRpcMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype); + +int32_t vmPutNodeMsgToWriteQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t vmPutNodeMsgToSyncQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t vmPutNodeMsgToQueryQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t vmPutNodeMsgToFetchQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index f251dd120e8e690c64045cbdd1b88ebb32f927e0..2505d2ff96a62734e1306d628919eb04e9cd6952 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "vmInt.h" -SVnodeObj **vmGetVnodeListFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) { +SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { taosRLockLatch(&pMgmt->latch); int32_t num = 0; @@ -44,7 +44,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) { return pVnodes; } -int32_t vmGetVnodeListFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) { +int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) { int32_t code = TSDB_CODE_INVALID_JSON_FORMAT; int32_t len = 0; int32_t maxLen = 30000; @@ -148,9 +148,9 @@ _OVER: return code; } -int32_t vmWriteVnodeListToFile(SVnodesMgmt *pMgmt) { - char file[PATH_MAX]; - char realfile[PATH_MAX]; +int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) { + char file[PATH_MAX] = {0}; + char realfile[PATH_MAX] = {0}; snprintf(file, sizeof(file), "%s%svnodes.json.bak", pMgmt->path, TD_DIRSEP); snprintf(realfile, sizeof(file), "%s%svnodes.json", pMgmt->path, TD_DIRSEP); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index a8aa336ddbe06ee83ee63908ff9ec8c22086bf48..016e32a134e40c625ede7dcb3b7b9ca8e7985d30 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -16,9 +16,7 @@ #define _DEFAULT_SOURCE #include "vmInt.h" -void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; - +static void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); if (pInfo->pVloads == NULL) return; @@ -39,11 +37,9 @@ void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) { taosRUnLockLatch(&pMgmt->latch); } -void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; - +static void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { SMonVloadInfo vloads = {0}; - vmGetVnodeLoads(pWrapper, &vloads); + vmGetVnodeLoads(pMgmt, &vloads); SArray *pVloads = vloads.pVloads; if (pVloads == NULL) return; @@ -86,10 +82,10 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) { taosArrayDestroy(pVloads); } -int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { +int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq) { SMonVmInfo vmInfo = {0}; - vmGetMonitorInfo(pWrapper, &vmInfo); - dmGetMonitorSysInfo(&vmInfo.sys); + vmGetMonitorInfo(pMgmt, &vmInfo); + dmGetSystemInfo(&vmInfo.sys); monGetLogs(&vmInfo.log); int32_t rspLen = tSerializeSMonVmInfo(NULL, 0, &vmInfo); @@ -111,9 +107,9 @@ int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { return 0; } -int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { +int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq) { SMonVloadInfo vloads = {0}; - vmGetVnodeLoads(pWrapper, &vloads); + vmGetVnodeLoads(pMgmt, &vloads); int32_t rspLen = tSerializeSMonVloadInfo(NULL, 0, &vloads); if (rspLen < 0) { @@ -163,7 +159,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { } } -static void vmGenerateWrapperCfg(SVnodesMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) { +static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) { pCfg->vgId = pCreate->vgId; pCfg->vgVersion = pCreate->vgVersion; pCfg->dropped = 0; @@ -172,7 +168,7 @@ static void vmGenerateWrapperCfg(SVnodesMgmt *pMgmt, SCreateVnodeReq *pCreate, S snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId); } -int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { SRpcMsg *pReq = &pMsg->rpcMsg; SCreateVnodeReq createReq = {0}; int32_t code = -1; @@ -207,17 +203,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - SMsgCb msgCb = pMgmt->pDnode->data.msgCb; - msgCb.pWrapper = pMgmt->pWrapper; - msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue; - msgCb.queueFps[SYNC_QUEUE] = vmPutMsgToSyncQueue; - msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; - msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; - msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; - msgCb.queueFps[MERGE_QUEUE] = vmPutMsgToMergeQueue; - msgCb.qsizeFp = vmGetQueueSize; - - SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb); + SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr()); goto _OVER; @@ -249,7 +235,7 @@ _OVER: return code; } -int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { SRpcMsg *pReq = &pMsg->rpcMsg; SDropVnodeReq dropReq = {0}; if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { @@ -280,57 +266,71 @@ int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return 0; } -void vmInitMsgHandle(SMgmtWrapper *pWrapper) { - dmSetMsgHandle(pWrapper, TDMT_MON_VM_INFO, vmProcessMonitorMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MON_VM_LOAD, vmProcessMonitorMsg, 0); +SArray *mmGetMsgHandles() { + int32_t code = -1; + SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle)); + if (pArray == NULL) goto _OVER; + + if (dmSetMgmtHandle(pArray, TDMT_MON_VM_INFO, vmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MON_VM_LOAD, vmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; // Requests handled by VNODE - dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, vmProcessFetchMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, vmProcessQueryMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg, 0); - // dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, vmProcessFetchMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT_RSMA, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, vmProcessFetchMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, vmProcessMergeMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, vmProcessFetchMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_VNODE, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_COMPACT_VNODE, vmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, 0); - - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_TIMEOUT, vmProcessSyncMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING, vmProcessSyncMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING_REPLY, vmProcessSyncMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST, vmProcessSyncMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, vmProcessSyncMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE, vmProcessSyncMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, vmProcessSyncMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES, vmProcessSyncMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, vmProcessSyncMsg, 0); + if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, vmPutNodeMsgToQueryQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, vmPutNodeMsgToQueryQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONSUME, vmPutNodeMsgToQueryQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_QUERY, vmPutNodeMsgToQueryQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONNECT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_DISCONNECT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + // if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_SET_CUR, vmPutNodeMsgToWriteQueue, 0)== NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_RES_READY, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TASKS_STATUS, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_SMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_PIPE_EXEC, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_MERGE_EXEC, vmPutNodeMsgToMergeQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_WRITE_EXEC, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; + + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; + + code = 0; + +_OVER: + if (code != 0) { + taosArrayDestroy(pArray); + return NULL; + } else { + return pArray; + } } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index b0e0d79bdeb0b370411b6d930be383c247fb087f..354fbeda04fc5e0a61cb49d595d1707513a3f946 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "vmInt.h" -SVnodeObj *vmAcquireVnode(SVnodesMgmt *pMgmt, int32_t vgId) { +SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { SVnodeObj *pVnode = NULL; int32_t refCount = 0; @@ -36,7 +36,7 @@ SVnodeObj *vmAcquireVnode(SVnodesMgmt *pMgmt, int32_t vgId) { return pVnode; } -void vmReleaseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { +void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { if (pVnode == NULL) return; taosRLockLatch(&pMgmt->latch); @@ -45,7 +45,7 @@ void vmReleaseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); } -int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { +int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj)); if (pVnode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -61,7 +61,6 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { pVnode->db = tstrdup(pCfg->db); pVnode->path = tstrdup(pCfg->path); pVnode->pImpl = pImpl; - pVnode->pWrapper = pMgmt->pWrapper; if (pVnode->path == NULL || pVnode->db == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -80,7 +79,7 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { return code; } -void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { +void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { char path[TSDB_FILENAME_LEN] = {0}; taosWLockLatch(&pMgmt->latch); @@ -115,8 +114,7 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { static void *vmOpenVnodeInThread(void *param) { SVnodeThread *pThread = param; - SVnodesMgmt *pMgmt = pThread->pMgmt; - SDnode *pDnode = pMgmt->pDnode; + SVnodeMgmt *pMgmt = pThread->pMgmt; char path[TSDB_FILENAME_LEN]; dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); @@ -128,19 +126,10 @@ static void *vmOpenVnodeInThread(void *param) { char stepDesc[TSDB_STEP_DESC_LEN] = {0}; snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId, pMgmt->state.openVnodes, pMgmt->state.totalVnodes); - dmReportStartup(pDnode, "vnode-open", stepDesc); - - SMsgCb msgCb = pMgmt->pDnode->data.msgCb; - msgCb.pWrapper = pMgmt->pWrapper; - msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue; - msgCb.queueFps[SYNC_QUEUE] = vmPutMsgToSyncQueue; - msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; - msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; - msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; - msgCb.queueFps[MERGE_QUEUE] = vmPutMsgToMergeQueue; - msgCb.qsizeFp = vmGetQueueSize; + tmsgReportStartup("vnode-open", stepDesc); + snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId); - SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb); + SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; @@ -157,9 +146,7 @@ static void *vmOpenVnodeInThread(void *param) { return NULL; } -static int32_t vmOpenVnodes(SVnodesMgmt *pMgmt) { - SDnode *pDnode = pMgmt->pDnode; - +static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); if (pMgmt->hash == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -227,7 +214,7 @@ static int32_t vmOpenVnodes(SVnodesMgmt *pMgmt) { } } -static void vmCloseVnodes(SVnodesMgmt *pMgmt) { +static void vmCloseVnodes(SVnodeMgmt *pMgmt) { dInfo("start to close all vnodes"); int32_t numOfVnodes = 0; @@ -249,40 +236,44 @@ static void vmCloseVnodes(SVnodesMgmt *pMgmt) { dInfo("total vnodes:%d are all closed", numOfVnodes); } -static void vmCleanup(SMgmtWrapper *pWrapper) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; - if (pMgmt == NULL) return; - +static void vmCleanup(SVnodeMgmt *pMgmt) { dInfo("vnode-mgmt start to cleanup"); vmCloseVnodes(pMgmt); vmStopWorker(pMgmt); vnodeCleanup(); tfsClose(pMgmt->pTfs); taosMemoryFree(pMgmt); - pWrapper->pMgmt = NULL; dInfo("vnode-mgmt is cleaned up"); } -static int32_t vmInit(SMgmtWrapper *pWrapper) { - SDnode *pDnode = pWrapper->pDnode; - SVnodesMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodesMgmt)); - int32_t code = -1; - +static int32_t vmInit(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { dInfo("vnode-mgmt start to init"); + int32_t code = -1; + + SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt)); if (pMgmt == NULL) goto _OVER; - pMgmt->path = pWrapper->path; - pMgmt->pDnode = pWrapper->pDnode; - pMgmt->pWrapper = pWrapper; + pMgmt->path = pInput->path; + pMgmt->name = pInput->name; + pMgmt->dnodeId = pInput->dnodeId; + pMgmt->msgCb = pInput->msgCb; + pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToWriteQueue; + pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)vmPutRpcMsgToSyncQueue; + pMgmt->msgCb.queueFps[APPLY_QUEUE] = (PutToQueueFp)vmPutRpcMsgToApplyQueue; + pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)vmPutRpcMsgToQueryQueue; + pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)vmPutRpcMsgToFetchQueue; + pMgmt->msgCb.queueFps[MERGE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToMergeQueue; + pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize; + pMgmt->msgCb.pMgmt = pMgmt; taosInitRWLatch(&pMgmt->latch); SDiskCfg dCfg = {0}; - tstrncpy(dCfg.dir, pDnode->data.dataDir, TSDB_FILENAME_LEN); + tstrncpy(dCfg.dir, pInput->dataDir, TSDB_FILENAME_LEN); dCfg.level = 0; dCfg.primary = 1; - SDiskCfg *pDisks = pDnode->data.disks; - int32_t numOfDisks = pDnode->data.numOfDisks; + SDiskCfg *pDisks = tsDiskCfg; + int32_t numOfDisks = tsDiskCfgNum; if (numOfDisks <= 0 || pDisks == NULL) { pDisks = &dCfg; numOfDisks = 1; @@ -293,64 +284,64 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { dError("failed to init tfs since %s", terrstr()); goto _OVER; } - dmReportStartup(pDnode, "vnode-tfs", "initialized"); + tmsgReportStartup("vnode-tfs", "initialized"); if (walInit() != 0) { dError("failed to init wal since %s", terrstr()); goto _OVER; } - dmReportStartup(pDnode, "vnode-wal", "initialized"); + tmsgReportStartup("vnode-wal", "initialized"); if (syncInit() != 0) { dError("failed to open sync since %s", terrstr()); - return -1; + goto _OVER; } + tmsgReportStartup("vnode-sync", "initialized"); if (vnodeInit(tsNumOfCommitThreads) != 0) { dError("failed to init vnode since %s", terrstr()); goto _OVER; } - dmReportStartup(pDnode, "vnode-commit", "initialized"); + tmsgReportStartup("vnode-commit", "initialized"); if (vmStartWorker(pMgmt) != 0) { - dError("failed to init workers since %s", terrstr()) goto _OVER; + dError("failed to init workers since %s", terrstr()); + goto _OVER; } - dmReportStartup(pDnode, "vnode-worker", "initialized"); + tmsgReportStartup("vnode-worker", "initialized"); if (vmOpenVnodes(pMgmt) != 0) { dError("failed to open vnode since %s", terrstr()); - return -1; + goto _OVER; } - dmReportStartup(pDnode, "vnode-vnodes", "initialized"); + tmsgReportStartup("vnode-vnodes", "initialized"); if (udfcOpen() != 0) { dError("failed to open udfc in vnode"); + goto _OVER; } code = 0; _OVER: if (code == 0) { - pWrapper->pMgmt = pMgmt; + pOutput->pMgmt = pMgmt; dInfo("vnodes-mgmt is initialized"); } else { dError("failed to init vnodes-mgmt since %s", terrstr()); - vmCleanup(pWrapper); + vmCleanup(pMgmt); } - return 0; + return code; } -static int32_t vmRequire(SMgmtWrapper *pWrapper, bool *required) { - SDnode *pDnode = pWrapper->pDnode; - *required = pDnode->data.supportVnodes > 0; +static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) { + *required = pInput->supportVnodes > 0; return 0; } -static int32_t vmStart(SMgmtWrapper *pWrapper) { +static int32_t vmStart(SVnodeMgmt *pMgmt) { dDebug("vnode-mgmt start to run"); - SVnodesMgmt *pMgmt = pWrapper->pMgmt; - taosRLockLatch(&pMgmt->latch); void *pIter = taosHashIterate(pMgmt->hash, NULL); @@ -367,20 +358,18 @@ static int32_t vmStart(SMgmtWrapper *pWrapper) { return 0; } -static void vmStop(SMgmtWrapper *pWrapper) { +static void vmStop(SVnodeMgmt *pMgmt) { // process inside the vnode } -void vmInitWrapper(SMgmtWrapper *pWrapper) { - SMgmtFp mgmtFp = {0}; - mgmtFp.openFp = vmInit; - mgmtFp.closeFp = vmCleanup; - mgmtFp.startFp = vmStart; - mgmtFp.stopFp = vmStop; - mgmtFp.requiredFp = vmRequire; - - vmInitMsgHandle(pWrapper); - pWrapper->name = "vnode"; - pWrapper->fp = mgmtFp; -} +SMgmtFunc vmGetMgmtFunc() { + SMgmtFunc mgmtFunc = {0}; + mgmtFunc.openFp = vmInit; + mgmtFunc.closeFp = (NodeCloseFp)vmCleanup; + mgmtFunc.startFp = (NodeStartFp)vmStart; + mgmtFunc.stopFp = (NodeStopFp)vmStop; + mgmtFunc.requiredFp = vmRequire; + mgmtFunc.getHandlesFp = vmGetMsgHandles; + return mgmtFunc; +} diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 2baa8b8942c996f818fc2ad56ce8798933b5b0d0..00ad137db6e65019098ee31b185a88adff4afc6e 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -32,7 +32,7 @@ static inline void vmSendRsp(SNodeMsg *pMsg, int32_t code) { } static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { - SVnodesMgmt *pMgmt = pInfo->ahandle; + SVnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; tmsg_t msgType = pMsg->rpcMsg.msgType; @@ -40,10 +40,10 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { switch (msgType) { case TDMT_MON_VM_INFO: - code = vmProcessGetMonVmInfoReq(pMgmt->pWrapper, pMsg); + code = vmProcessGetMonitorInfoReq(pMgmt, pMsg); break; case TDMT_MON_VM_LOAD: - code = vmProcessGetVnodeLoadsReq(pMgmt->pWrapper, pMsg); + code = vmProcessGetLoadsReq(pMgmt, pMsg); break; case TDMT_DND_CREATE_VNODE: code = vmProcessCreateVnodeReq(pMgmt, pMsg); @@ -240,7 +240,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO } } -static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) { +static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) { SRpcMsg *pRpc = &pMsg->rpcMsg; SMsgHead *pHead = pRpc->pCont; int32_t code = 0; @@ -285,41 +285,34 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp return code; } -int32_t vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; +int32_t vmPutNodeMsgToSyncQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); } -int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; +int32_t vmPutNodeMsgToWriteQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); } -int32_t vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; +int32_t vmPutNodeMsgToQueryQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); } -int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; +int32_t vmPutNodeMsgToFetchQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); } -int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; +int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, MERGE_QUEUE); } -int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; +int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { SSingleWorker *pWorker = &pMgmt->mgmtWorker; dTrace("msg:%p, will be put into vnode-mgmt queue, worker:%s", pMsg, pWorker->name); taosWriteQitem(pWorker->queue, pMsg); return 0; } -int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; +int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { SSingleWorker *pWorker = &pMgmt->monitorWorker; dTrace("msg:%p, will be put into vnode-monitor queue, worker:%s", pMsg, pWorker->name); @@ -327,9 +320,8 @@ int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } -static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; - SMsgHead *pHead = pRpc->pCont; +static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) { + SMsgHead *pHead = pRpc->pCont; SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) return -1; @@ -377,33 +369,31 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT return code; } -int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - return vmPutRpcMsgToQueue(pWrapper, pRpc, WRITE_QUEUE); +int32_t vmPutRpcMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) { + return vmPutRpcMsgToQueue(pMgmt, pRpc, WRITE_QUEUE); } -int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - return vmPutRpcMsgToQueue(pWrapper, pRpc, SYNC_QUEUE); -} +int32_t vmPutRpcMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pMgmt, pRpc, SYNC_QUEUE); } -int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE); +int32_t vmPutRpcMsgToApplyQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) { + return vmPutRpcMsgToQueue(pMgmt, pRpc, APPLY_QUEUE); } -int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE); +int32_t vmPutRpcMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) { + return vmPutRpcMsgToQueue(pMgmt, pRpc, QUERY_QUEUE); } -int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - return vmPutRpcMsgToQueue(pWrapper, pRpc, FETCH_QUEUE); +int32_t vmPutRpcMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) { + return vmPutRpcMsgToQueue(pMgmt, pRpc, FETCH_QUEUE); } -int32_t vmPutMsgToMergeQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - return vmPutRpcMsgToQueue(pWrapper, pRpc, MERGE_QUEUE); +int32_t vmPutRpcMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) { + return vmPutRpcMsgToQueue(pMgmt, pRpc, MERGE_QUEUE); } -int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { +int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { int32_t size = -1; - SVnodeObj *pVnode = vmAcquireVnode(pWrapper->pMgmt, vgId); + SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId); if (pVnode != NULL) { switch (qtype) { case WRITE_QUEUE: @@ -428,11 +418,11 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { break; } } - vmReleaseVnode(pWrapper->pMgmt, pVnode); + vmReleaseVnode(pMgmt, pVnode); return size; } -int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { +int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); @@ -450,7 +440,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { return 0; } -void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { +void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); @@ -466,7 +456,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { dDebug("vgId:%d, vnode queue is freed", pVnode->vgId); } -int32_t vmStartWorker(SVnodesMgmt *pMgmt) { +int32_t vmStartWorker(SVnodeMgmt *pMgmt) { SQWorkerPool *pQPool = &pMgmt->queryPool; pQPool->name = "vnode-query"; pQPool->min = tsNumOfVnodeQueryThreads; @@ -524,7 +514,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { return 0; } -void vmStopWorker(SVnodesMgmt *pMgmt) { +void vmStopWorker(SVnodeMgmt *pMgmt) { tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->mgmtWorker); tWWorkerCleanup(&pMgmt->writePool); diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 54b29f546cc888af80adaf1ec13c3b1a8c4d3efb..1259363f9468c2667536a17652010914dd4e2eac 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -26,10 +26,6 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { return NULL; } - if (udfcOpen() != 0) { - qError("qnode can not open udfc"); - } - if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) { taosMemoryFreeClear(pQnode); return NULL; @@ -41,9 +37,6 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { void qndClose(SQnode *pQnode) { qWorkerDestroy((void **)&pQnode->pQuery); - - udfcClose(); - taosMemoryFree(pQnode); }