From 628a5bbadda9d61a666142a2bc7ab64c07f85e4f Mon Sep 17 00:00:00 2001 From: Shengliang Date: Wed, 11 May 2022 17:48:38 +0800 Subject: [PATCH] refactor: node mgmt --- source/dnode/mgmt/mgmt_snode/inc/smInt.h | 23 +++++---- source/dnode/mgmt/mgmt_snode/src/smHandle.c | 43 ++++++++++------ source/dnode/mgmt/mgmt_snode/src/smInt.c | 56 ++++++++++----------- source/dnode/mgmt/mgmt_snode/src/smWorker.c | 20 +++----- 4 files changed, 73 insertions(+), 69 deletions(-) diff --git a/source/dnode/mgmt/mgmt_snode/inc/smInt.h b/source/dnode/mgmt/mgmt_snode/inc/smInt.h index 9eb48af733..3a2a448633 100644 --- a/source/dnode/mgmt/mgmt_snode/inc/smInt.h +++ b/source/dnode/mgmt/mgmt_snode/inc/smInt.h @@ -26,9 +26,10 @@ extern "C" { typedef struct SSnodeMgmt { SSnode *pSnode; - SDnode *pDnode; - SMgmtWrapper *pWrapper; + SMsgCb msgCb; const char *path; + const char *name; + int32_t dnodeId; SRWLatch latch; int8_t uniqueWorkerInUse; SArray *uniqueWorkers; // SArray @@ -37,19 +38,19 @@ typedef struct SSnodeMgmt { } SSnodeMgmt; // smHandle.c -void smInitMsgHandle(SMgmtWrapper *pWrapper); -int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t smProcessGetMonSmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); +SArray *smGetMsgHandles(); +int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SNodeMsg *pReq); // smWorker.c int32_t smStartWorker(SSnodeMgmt *pMgmt); void smStopWorker(SSnodeMgmt *pMgmt); -int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t smProcessExecMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t smPutNodeMsgToExecQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 34aff001bc..41310aa4ec 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -16,12 +16,12 @@ #define _DEFAULT_SOURCE #include "smInt.h" -void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo) {} +static void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {} -int32_t smProcessGetMonSmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { +int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SNodeMsg *pReq) { SMonSmInfo smInfo = {0}; - smGetMonitorInfo(pWrapper, &smInfo); - dmGetMonitorSysInfo(&smInfo.sys); + smGetMonitorInfo(pMgmt, &smInfo); + dmGetSystemInfo(&smInfo.sys); monGetLogs(&smInfo.log); int32_t rspLen = tSerializeSMonSmInfo(NULL, 0, &smInfo); @@ -43,8 +43,7 @@ int32_t smProcessGetMonSmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { return 0; } -int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SDnode *pDnode = pWrapper->pDnode; +int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { SRpcMsg *pReq = &pMsg->rpcMsg; SDCreateSnodeReq createReq = {0}; @@ -53,14 +52,14 @@ int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return -1; } - if (createReq.dnodeId != pDnode->data.dnodeId) { + if (createReq.dnodeId != pMgmt->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to create snode since %s", terrstr()); return -1; } bool deployed = true; - if (dmWriteFile(pWrapper, deployed) != 0) { + if (dmWriteFile(pMgmt->path, pMgmt->name, deployed) != 0) { dError("failed to write snode file since %s", terrstr()); return -1; } @@ -68,8 +67,7 @@ int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } -int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SDnode *pDnode = pWrapper->pDnode; +int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { SRpcMsg *pReq = &pMsg->rpcMsg; SDDropSnodeReq dropReq = {0}; @@ -78,14 +76,14 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return -1; } - if (dropReq.dnodeId != pDnode->data.dnodeId) { + if (dropReq.dnodeId != pMgmt->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop snode since %s", terrstr()); return -1; } bool deployed = false; - if (dmWriteFile(pWrapper, deployed) != 0) { + if (dmWriteFile(pMgmt->path, pMgmt->name, deployed) != 0) { dError("failed to write snode file since %s", terrstr()); return -1; } @@ -93,10 +91,23 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } -void smInitMsgHandle(SMgmtWrapper *pWrapper) { - dmSetMsgHandle(pWrapper, TDMT_MON_SM_INFO, smProcessMonitorMsg, 0); +SArray *smGetMsgHandles() { + int32_t code = -1; + SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle)); + if (pArray == NULL) goto _OVER; + + if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; // Requests handled by SNODE - dmSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, smProcessMgmtMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, smProcessExecMsg, 0); + if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_EXEC, smPutNodeMsgToExecQueue, 0) == NULL) goto _OVER; + + code = 0; +_OVER: + if (code != 0) { + taosArrayDestroy(pArray); + return NULL; + } else { + return pArray; + } } diff --git a/source/dnode/mgmt/mgmt_snode/src/smInt.c b/source/dnode/mgmt/mgmt_snode/src/smInt.c index a3216ae373..5b9f2fc7f7 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smInt.c +++ b/source/dnode/mgmt/mgmt_snode/src/smInt.c @@ -17,34 +17,25 @@ #include "smInt.h" #include "libs/function/function.h" -static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dmReadFile(pWrapper, required); } - -static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { - SMsgCb msgCb = pMgmt->pDnode->data.msgCb; - msgCb.pWrapper = pMgmt->pWrapper; - pOption->msgCb = msgCb; +static int32_t smRequire(const SMgmtInputOpt *pInput, bool *required) { + return dmReadFile(pInput->path, pInput->name, required); } -static void smClose(SMgmtWrapper *pWrapper) { - SSnodeMgmt *pMgmt = pWrapper->pMgmt; - if (pMgmt == NULL) return; +static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { pOption->msgCb = pMgmt->msgCb; } +static void smClose(SSnodeMgmt *pMgmt) { dInfo("snode-mgmt start to cleanup"); - - udfcClose(); - if (pMgmt->pSnode != NULL) { smStopWorker(pMgmt); sndClose(pMgmt->pSnode); pMgmt->pSnode = NULL; } - pWrapper->pMgmt = NULL; taosMemoryFree(pMgmt); dInfo("snode-mgmt is cleaned up"); } -int32_t smOpen(SMgmtWrapper *pWrapper) { +int32_t smOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { dInfo("snode-mgmt start to init"); SSnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SSnodeMgmt)); if (pMgmt == NULL) { @@ -52,42 +43,47 @@ int32_t smOpen(SMgmtWrapper *pWrapper) { return -1; } - pMgmt->path = pWrapper->path; - pMgmt->pDnode = pWrapper->pDnode; - pMgmt->pWrapper = pWrapper; - pWrapper->pMgmt = pMgmt; + pMgmt->path = pInput->path; + pMgmt->name = pInput->name; + pMgmt->dnodeId = pInput->dnodeId; + pMgmt->msgCb = pInput->msgCb; + pMgmt->msgCb.pMgmt = pMgmt; SSnodeOpt option = {0}; smInitOption(pMgmt, &option); pMgmt->pSnode = sndOpen(pMgmt->path, &option); if (pMgmt->pSnode == NULL) { dError("failed to open snode since %s", terrstr()); + smClose(pMgmt); return -1; } - dmReportStartup(pWrapper->pDnode, "snode-impl", "initialized"); + tmsgReportStartup("snode-impl", "initialized"); if (smStartWorker(pMgmt) != 0) { dError("failed to start snode worker since %s", terrstr()); + smClose(pMgmt); return -1; } - dmReportStartup(pWrapper->pDnode, "snode-worker", "initialized"); + tmsgReportStartup("snode-worker", "initialized"); if (udfcOpen() != 0) { dError("failed to open udfc in snode"); + smClose(pMgmt); + return -1; } + pOutput->pMgmt = pMgmt; return 0; } -void smInitWrapper(SMgmtWrapper *pWrapper) { - SMgmtFp mgmtFp = {0}; - mgmtFp.openFp = smOpen; - mgmtFp.closeFp = smClose; - mgmtFp.createFp = smProcessCreateReq; - mgmtFp.dropFp = smProcessDropReq; - mgmtFp.requiredFp = smRequire; +SMgmtFunc smGetMgmtFunc() { + SMgmtFunc mgmtFunc = {0}; + mgmtFunc.openFp = smOpen; + mgmtFunc.closeFp = (NodeCloseFp)smClose; + mgmtFunc.createFp = (NodeCreateFp)smProcessCreateReq; + mgmtFunc.dropFp = (NodeDropFp)smProcessDropReq; + mgmtFunc.requiredFp = smRequire; + mgmtFunc.getHandlesFp = smGetMsgHandles; - smInitMsgHandle(pWrapper); - pWrapper->name = "snode"; - pWrapper->fp = mgmtFp; + return mgmtFunc; } diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 2ae439bbd6..7bee965a4b 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -36,7 +36,7 @@ static void smProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { int32_t code = -1; if (pMsg->rpcMsg.msgType == TDMT_MON_SM_INFO) { - code = smProcessGetMonSmInfoReq(pMgmt->pWrapper, pMsg); + code = smProcessGetMonitorInfoReq(pMgmt, pMsg); } else { terrno = TSDB_CODE_MSG_NOT_PROCESSED; } @@ -163,8 +163,7 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) { return 0; } -int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SSnodeMgmt *pMgmt = pWrapper->pMgmt; +int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; @@ -176,8 +175,7 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } -int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SSnodeMgmt *pMgmt = pWrapper->pMgmt; +int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { SSingleWorker *pWorker = &pMgmt->monitorWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); @@ -185,8 +183,7 @@ int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } -int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SSnodeMgmt *pMgmt = pWrapper->pMgmt; +int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); if (pWorker == NULL) { @@ -199,8 +196,7 @@ int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } -int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SSnodeMgmt *pMgmt = pWrapper->pMgmt; +int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { SSingleWorker *pWorker = &pMgmt->sharedWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); @@ -208,11 +204,11 @@ int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } -int32_t smProcessExecMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { +int32_t smPutNodeMsgToExecQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { int32_t workerType = smGetSWTypeFromMsg(&pMsg->rpcMsg); if (workerType == SND_WORKER_TYPE__SHARED) { - return smProcessSharedMsg(pWrapper, pMsg); + return smPutNodeMsgToSharedQueue(pMgmt, pMsg); } else { - return smProcessUniqueMsg(pWrapper, pMsg); + return smPutNodeMsgToUniqueQueue(pMgmt, pMsg); } } -- GitLab