From 6296188fd3acdf4e4d217a859df5057a8e962524 Mon Sep 17 00:00:00 2001 From: Shengliang Date: Wed, 11 May 2022 17:38:00 +0800 Subject: [PATCH] refactor: node mgmt --- include/common/tmsgcb.h | 7 +- source/common/src/tmsgcb.c | 4 +- source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 38 ++-- source/dnode/mgmt/mgmt_mnode/src/mmFile.c | 8 +- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 219 ++++++++++---------- source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 115 +++++----- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 76 +++---- 7 files changed, 219 insertions(+), 248 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index b6c96bb2d1..fad159a270 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -37,8 +37,8 @@ typedef enum { QUEUE_MAX, } EQueueType; -typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); -typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype); +typedef int32_t (*PutToQueueFp)(void *pMgmt, SRpcMsg* pReq); +typedef int32_t (*GetQueueSizeFp)(void *pMgmt, int32_t vgId, EQueueType qtype); typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp); @@ -49,6 +49,8 @@ typedef void (*ReportStartup)(SMgmtWrapper* pWrapper, const char* name, const ch typedef struct { SMgmtWrapper* pWrapper; + void* pMgmt; + void* clientRpc; PutToQueueFp queueFps[QUEUE_MAX]; GetQueueSizeFp qsizeFp; SendReqFp sendReqFp; @@ -57,7 +59,6 @@ typedef struct { RegisterBrokenLinkArgFp registerBrokenLinkArgFp; ReleaseHandleFp releaseHandleFp; ReportStartup reportStartupFp; - void* clientRpc; } SMsgCb; void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb); diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index 42612cecb9..cdf7dbfda9 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -24,7 +24,7 @@ void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; } int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { PutToQueueFp fp = pMsgCb->queueFps[qtype]; if (fp != NULL) { - return (*fp)(pMsgCb->pWrapper, pReq); + return (*fp)(pMsgCb->pMgmt, pReq); } else { terrno = TSDB_CODE_INVALID_PTR; return -1; @@ -34,7 +34,7 @@ int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { GetQueueSizeFp fp = pMsgCb->qsizeFp; if (fp != NULL) { - return (*fp)(pMsgCb->pWrapper, vgId, qtype); + return (*fp)(pMsgCb->pMgmt, vgId, qtype); } else { terrno = TSDB_CODE_INVALID_PTR; return -1; diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 4d40d1fa28..125325b16c 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -17,7 +17,6 @@ #define _TD_DND_MNODE_INT_H_ #include "dmInt.h" - #include "mnode.h" #ifdef __cplusplus @@ -26,9 +25,10 @@ extern "C" { typedef struct SMnodeMgmt { SMnode *pMnode; - SDnode *pDnode; - SMgmtWrapper *pWrapper; + SMsgCb msgCb; const char *path; + const char *name; + int32_t dnodeId; SSingleWorker queryWorker; SSingleWorker readWorker; SSingleWorker writeWorker; @@ -41,33 +41,31 @@ typedef struct SMnodeMgmt { // mmFile.c int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed); -int32_t mmWriteFile(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq, bool deployed); +int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed); // mmInt.c int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq); // mmHandle.c -void mmInitMsgHandle(SMgmtWrapper *pWrapper); -int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +SArray *mmGetMsgHandles(); +int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); -int32_t mmProcessGetMnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); -void mmGetMnodeLoads(SMgmtWrapper *pWrapper, SMonMloadInfo *pInfo); +int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq); +int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq); // mmWorker.c int32_t mmStartWorker(SMnodeMgmt *pMgmt); void mmStopWorker(SMnodeMgmt *pMgmt); -int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); - -int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); -int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); -int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); -int32_t mmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); +int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); +int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); +int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); +int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c index 83c832a41e..bf3606a4c3 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c @@ -28,7 +28,6 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { snprintf(file, sizeof(file), "%s%smnode.json", pMgmt->path, TD_DIRSEP); pFile = taosOpenFile(file, TD_FILE_READ); if (pFile == NULL) { - // dDebug("file %s not exist", file); code = 0; goto _OVER; } @@ -105,11 +104,11 @@ _OVER: return code; } -int32_t mmWriteFile(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq, bool deployed) { +int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) { char file[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0}; - snprintf(file, sizeof(file), "%s%smnode.json.bak", pWrapper->path, TD_DIRSEP); - snprintf(realfile, sizeof(realfile), "%s%smnode.json", pWrapper->path, TD_DIRSEP); + snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP); + snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP); TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { @@ -125,7 +124,6 @@ int32_t mmWriteFile(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq, bool deploye len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); - SMnodeMgmt *pMgmt = pWrapper->pMgmt; if (pReq != NULL || pMgmt != NULL) { int8_t replica = (pReq != NULL ? pReq->replica : pMgmt->replica); for (int32_t i = 0; i < replica; ++i) { diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index bb6e3aad83..7026a9bd12 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -16,15 +16,14 @@ #define _DEFAULT_SOURCE #include "mmInt.h" -void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; +static void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *mmInfo) { mndGetMonitorInfo(pMgmt->pMnode, &mmInfo->cluster, &mmInfo->vgroup, &mmInfo->grant); } -int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { +int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq) { SMonMmInfo mmInfo = {0}; - mmGetMonitorInfo(pWrapper, &mmInfo); - dmGetMonitorSysInfo(&mmInfo.sys); + mmGetMonitorInfo(pMgmt, &mmInfo); + dmGetSystemInfo(&mmInfo.sys); monGetLogs(&mmInfo.log); int32_t rspLen = tSerializeSMonMmInfo(NULL, 0, &mmInfo); @@ -46,15 +45,14 @@ int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { return 0; } -void mmGetMnodeLoads(SMgmtWrapper *pWrapper, SMonMloadInfo *pInfo) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; +static void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) { pInfo->isMnode = 1; mndGetLoad(pMgmt->pMnode, &pInfo->load); } -int32_t mmProcessGetMnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { +int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq) { SMonMloadInfo mloads = {0}; - mmGetMnodeLoads(pWrapper, &mloads); + mmGetMnodeLoads(pMgmt, &mloads); int32_t rspLen = tSerializeSMonMloadInfo(NULL, 0, &mloads); if (rspLen < 0) { @@ -74,8 +72,7 @@ int32_t mmProcessGetMnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { return 0; } -int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SDnode *pDnode = pWrapper->pDnode; +int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { SRpcMsg *pReq = &pMsg->rpcMsg; SDCreateMnodeReq createReq = {0}; @@ -84,14 +81,14 @@ int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return -1; } - if (createReq.replica <= 1 || createReq.dnodeId != pDnode->data.dnodeId) { + if (createReq.replica <= 1 || createReq.dnodeId != pMgmt->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to create mnode since %s", terrstr()); return -1; } bool deployed = true; - if (mmWriteFile(pWrapper, &createReq, deployed) != 0) { + if (mmWriteFile(pMgmt, &createReq, deployed) != 0) { dError("failed to write mnode file since %s", terrstr()); return -1; } @@ -99,8 +96,7 @@ int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } -int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SDnode *pDnode = pWrapper->pDnode; +int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { SRpcMsg *pReq = &pMsg->rpcMsg; SDDropMnodeReq dropReq = {0}; @@ -109,14 +105,14 @@ int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return -1; } - if (dropReq.dnodeId != pDnode->data.dnodeId) { + if (dropReq.dnodeId != pMgmt->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop mnode since %s", terrstr()); return -1; } bool deployed = false; - if (mmWriteFile(pWrapper, NULL, deployed) != 0) { + if (mmWriteFile(pMgmt, NULL, deployed) != 0) { dError("failed to write mnode file since %s", terrstr()); return -1; } @@ -125,7 +121,6 @@ int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnode *pDnode = pMgmt->pDnode; SRpcMsg *pReq = &pMsg->rpcMsg; SDAlterMnodeReq alterReq = {0}; @@ -134,104 +129,118 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - if (pDnode->data.dnodeId != 0 && alterReq.dnodeId != pDnode->data.dnodeId) { + if (pMgmt->dnodeId != 0 && alterReq.dnodeId != pMgmt->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; - dError("failed to alter mnode since %s, input:%d cur:%d", terrstr(), alterReq.dnodeId, pDnode->data.dnodeId); + dError("failed to alter mnode since %s, input:%d cur:%d", terrstr(), alterReq.dnodeId, pMgmt->dnodeId); return -1; } else { return mmAlter(pMgmt, &alterReq); } } -void mmInitMsgHandle(SMgmtWrapper *pWrapper) { - dmSetMsgHandle(pWrapper, TDMT_MON_MM_INFO, mmProcessMonitorMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MON_MM_LOAD, mmProcessMonitorMsg, 0); +SArray *mmGetMsgHandles() { + int32_t code = -1; + SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle)); + if (pArray == NULL) goto _OVER; + + if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MON_MM_LOAD, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; // Requests handled by DNODE - dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg, 0); + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; // Requests handled by MNODE - dmSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_QNODE_LIST, mmProcessReadMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_MQ_ASK_EP, mmProcessReadMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_MND_GET_INDEX, mmProcessReadMsg, 0); + if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_USER, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_USER, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_USER, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_QNODE_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_BNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_BNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; // Requests handled by VNODE - dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, mmProcessWriteMsg, 0); - - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, mmProcessQueryMsg, 1); - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, mmProcessQueryMsg, 1); - dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, mmProcessQueryMsg, 1); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, mmProcessQueryMsg, 1); - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, mmProcessQueryMsg, 1); - - dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_VNODE_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_VNODE_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_COMPACT_VNODE_RSP, mmProcessWriteMsg, 0); + if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + + code = 0; + +_OVER: + if (code != 0) { + taosArrayDestroy(pArray); + return NULL; + } else { + return pArray; + } } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 0f9158971b..8815af2f22 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -17,45 +17,35 @@ #include "mmInt.h" #include "wal.h" -static bool mmDeployRequired(SDnode *pDnode) { - if (pDnode->data.dnodeId > 0) return false; - if (pDnode->data.clusterId > 0) return false; - if (strcmp(pDnode->data.localEp, pDnode->data.firstEp) != 0) return false; +static bool mmDeployRequired(const SMgmtInputOpt *pInput) { + if (pInput->dnodeId > 0) return false; + if (pInput->clusterId > 0) return false; + if (strcmp(pInput->localEp, pInput->firstEp) != 0) return false; return true; } -static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) { +static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) { SMnodeMgmt mgmt = {0}; - mgmt.path = pWrapper->path; + mgmt.path = pInput->path; if (mmReadFile(&mgmt, required) != 0) { return -1; } if (!(*required)) { - *required = mmDeployRequired(pWrapper->pDnode); + *required = mmDeployRequired(pInput); } return 0; } -static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { - SMsgCb msgCb = pMgmt->pDnode->data.msgCb; - msgCb.pWrapper = pMgmt->pWrapper; - msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue; - msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue; - msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue; - msgCb.queueFps[SYNC_QUEUE] = mmPutMsgToWriteQueue; - pOption->msgCb = msgCb; -} - -static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { - mmInitOption(pMgmt, pOption); +static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInput, SMnodeOpt *pOption) { + pOption->msgCb = pMgmt->msgCb; pOption->replica = 1; pOption->selfIndex = 0; SReplica *pReplica = &pOption->replicas[0]; pReplica->id = 1; - pReplica->port = pMgmt->pDnode->data.serverPort; - tstrncpy(pReplica->fqdn, pMgmt->pDnode->data.localFqdn, TSDB_FQDN_LEN); + pReplica->port = pInput->serverPort; + tstrncpy(pReplica->fqdn, pInput->localFqdn, TSDB_FQDN_LEN); pOption->deploy = true; pMgmt->selfIndex = pOption->selfIndex; @@ -64,7 +54,7 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { } static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { - mmInitOption(pMgmt, pOption); + pOption->msgCb = pMgmt->msgCb; pOption->selfIndex = pMgmt->selfIndex; pOption->replica = pMgmt->replica; memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); @@ -72,8 +62,7 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { } static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) { - mmInitOption(pMgmt, pOption); - + pOption->msgCb = pMgmt->msgCb; pOption->replica = pCreate->replica; pOption->selfIndex = -1; for (int32_t i = 0; i < pCreate->replica; ++i) { @@ -81,7 +70,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre pReplica->id = pCreate->replicas[i].id; pReplica->port = pCreate->replicas[i].port; memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); - if (pReplica->id == pMgmt->pDnode->data.dnodeId) { + if (pReplica->id == pMgmt->dnodeId) { pOption->selfIndex = i; } } @@ -109,7 +98,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) { } bool deployed = true; - if (mmWriteFile(pMgmt->pWrapper, pReq, deployed) != 0) { + if (mmWriteFile(pMgmt, pReq, deployed) != 0) { dError("failed to write mnode file since %s", terrstr()); return -1; } @@ -117,10 +106,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) { return 0; } -static void mmClose(SMgmtWrapper *pWrapper) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; - if (pMgmt == NULL) return; - +static void mmClose(SMnodeMgmt *pMgmt) { dInfo("mnode-mgmt start to cleanup"); if (pMgmt->pMnode != NULL) { mmStopWorker(pMgmt); @@ -128,12 +114,11 @@ static void mmClose(SMgmtWrapper *pWrapper) { pMgmt->pMnode = NULL; } - pWrapper->pMgmt = NULL; taosMemoryFree(pMgmt); dInfo("mnode-mgmt is cleaned up"); } -static int32_t mmOpen(SMgmtWrapper *pWrapper) { +static int32_t mmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { dInfo("mnode-mgmt start to init"); if (walInit() != 0) { dError("failed to init wal since %s", terrstr()); @@ -146,23 +131,28 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) { return -1; } - pMgmt->path = pWrapper->path; - pMgmt->pDnode = pWrapper->pDnode; - pMgmt->pWrapper = pWrapper; - pWrapper->pMgmt = pMgmt; + pMgmt->path = pInput->path; + pMgmt->name = pInput->name; + pMgmt->dnodeId = pInput->dnodeId; + pMgmt->msgCb = pInput->msgCb; + pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue; + pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue; + pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; + pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; + pMgmt->msgCb.pMgmt = pMgmt; bool deployed = false; if (mmReadFile(pMgmt, &deployed) != 0) { dError("failed to read file since %s", terrstr()); - mmClose(pWrapper); + mmClose(pMgmt); return -1; } SMnodeOpt option = {0}; if (!deployed) { dInfo("mnode start to deploy"); - pWrapper->pDnode->data.dnodeId = 1; - mmBuildOptionForDeploy(pMgmt, &option); + pMgmt->dnodeId = 1; + mmBuildOptionForDeploy(pMgmt, pInput, &option); } else { dInfo("mnode start to open"); mmBuildOptionForOpen(pMgmt, &option); @@ -171,55 +161,52 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) { pMgmt->pMnode = mndOpen(pMgmt->path, &option); if (pMgmt->pMnode == NULL) { dError("failed to open mnode since %s", terrstr()); - mmClose(pWrapper); + mmClose(pMgmt); return -1; } - dmReportStartup(pWrapper->pDnode, "mnode-impl", "initialized"); + tmsgReportStartup("mnode-impl", "initialized"); if (mmStartWorker(pMgmt) != 0) { dError("failed to start mnode worker since %s", terrstr()); - mmClose(pWrapper); + mmClose(pMgmt); return -1; } - dmReportStartup(pWrapper->pDnode, "mnode-worker", "initialized"); + tmsgReportStartup("mnode-worker", "initialized"); if (!deployed) { deployed = true; - if (mmWriteFile(pWrapper, NULL, deployed) != 0) { + if (mmWriteFile(pMgmt, NULL, deployed) != 0) { dError("failed to write mnode file since %s", terrstr()); return -1; } } + pOutput->dnodeId = pMgmt->dnodeId; + pOutput->pMgmt = pMgmt; dInfo("mnode-mgmt is initialized"); return 0; } -static int32_t mmStart(SMgmtWrapper *pWrapper) { +static int32_t mmStart(SMnodeMgmt *pMgmt) { dDebug("mnode-mgmt start to run"); - SMnodeMgmt *pMgmt = pWrapper->pMgmt; return mndStart(pMgmt->pMnode); } -static void mmStop(SMgmtWrapper *pWrapper) { +static void mmStop(SMnodeMgmt *pMgmt) { dDebug("mnode-mgmt start to stop"); - SMnodeMgmt *pMgmt = pWrapper->pMgmt; - if (pMgmt != NULL) { - mndStop(pMgmt->pMnode); - } + mndStop(pMgmt->pMnode); } -void mmInitWrapper(SMgmtWrapper *pWrapper) { - SMgmtFp mgmtFp = {0}; - mgmtFp.openFp = mmOpen; - mgmtFp.closeFp = mmClose; - mgmtFp.startFp = mmStart; - mgmtFp.stopFp = mmStop; - mgmtFp.createFp = mmProcessCreateReq; - mgmtFp.dropFp = mmProcessDropReq; - mgmtFp.requiredFp = mmRequire; - - mmInitMsgHandle(pWrapper); - pWrapper->name = "mnode"; - pWrapper->fp = mgmtFp; +SMgmtFunc mmGetMgmtFunc() { + SMgmtFunc mgmtFunc = {0}; + mgmtFunc.openFp = mmOpen; + mgmtFunc.closeFp = (NodeCloseFp)mmClose; + mgmtFunc.startFp = (NodeStartFp)mmStart; + mgmtFunc.stopFp = (NodeStopFp)mmStop; + mgmtFunc.createFp = (NodeCreateFp)mmProcessCreateReq; + mgmtFunc.dropFp = (NodeDropFp)mmProcessDropReq; + mgmtFunc.requiredFp = mmRequire; + mgmtFunc.getHandlesFp = mmGetMsgHandles; + + return mgmtFunc; } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index aac5bbc16a..52756aa7ce 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -30,9 +30,9 @@ static inline void mmSendRsp(SNodeMsg *pMsg, int32_t code) { static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; - - int32_t code = -1; - tmsg_t msgType = pMsg->rpcMsg.msgType; + int32_t code = -1; + tmsg_t msgType = pMsg->rpcMsg.msgType; + bool isRequest = msgType & 1U; dTrace("msg:%p, get from mnode queue", pMsg); switch (msgType) { @@ -40,17 +40,17 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { code = mmProcessAlterReq(pMgmt, pMsg); break; case TDMT_MON_MM_INFO: - code = mmProcessGetMonMmInfoReq(pMgmt->pWrapper, pMsg); + code = mmProcessGetMonitorInfoReq(pMgmt, pMsg); break; case TDMT_MON_MM_LOAD: - code = mmProcessGetMnodeLoadsReq(pMgmt->pWrapper, pMsg); + code = mmProcessGetLoadsReq(pMgmt, pMsg); break; default: pMsg->pNode = pMgmt->pMnode; code = mndProcessMsg(pMsg); } - if (msgType & 1U) { + if (isRequest) { if (pMsg->rpcMsg.handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && terrno != 0) code = terrno; mmSendRsp(pMsg, code); @@ -64,62 +64,46 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; - + int32_t code = -1; + tmsg_t msgType = pMsg->rpcMsg.msgType; + bool isRequest = msgType & 1U; dTrace("msg:%p, get from mnode-query queue", pMsg); - SRpcMsg *pRpc = &pMsg->rpcMsg; - int32_t code = -1; pMsg->pNode = pMgmt->pMnode; code = mndProcessMsg(pMsg); - if (pRpc->msgType & 1U) { - if (pRpc->handle != NULL && code != 0) { - dError("msg:%p, failed to process since %s", pMsg, terrstr()); + if (isRequest) { + if (pMsg->rpcMsg.handle != NULL && code != 0) { + if (code != 0 && terrno != 0) code = terrno; mmSendRsp(pMsg, code); } } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - rpcFreeCont(pRpc->pCont); + rpcFreeCont(pMsg->rpcMsg.pCont); taosFreeQitem(pMsg); } -static void mmPutNodeMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { - dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); +static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { + dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->rpcMsg.msgType)); taosWriteQitem(pWorker->queue, pMsg); -} - -int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; - mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg); return 0; } -int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; - mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); - return 0; -} +int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg); } -int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; - mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); - return 0; -} +int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); } -int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; - mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); - return 0; +int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); } + +int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); } -int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; - mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg); - return 0; +int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { + return mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg); } -static int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) { +static inline int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) return -1; @@ -129,25 +113,19 @@ static int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) { return 0; } -int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; +int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) { return mmPutRpcMsgToWorker(&pMgmt->queryWorker, pRpc); } -int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; +int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) { return mmPutRpcMsgToWorker(&pMgmt->writeWorker, pRpc); } -int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; +int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) { return mmPutRpcMsgToWorker(&pMgmt->readWorker, pRpc); } -int32_t mmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; - return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pRpc); -} +int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pRpc); } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { SSingleWorkerCfg qCfg = { -- GitLab