From e10e8c8fec85c9de26954ce2eb8f0ecbeeff0239 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 16 Mar 2022 18:20:57 +0800 Subject: [PATCH] shm --- include/common/tmsg.h | 7 +++ include/dnode/mgmt/dnode.h | 2 +- include/libs/transport/trpc.h | 4 -- source/dnode/mgmt/container/src/dndNode.c | 4 +- source/dnode/mgmt/dnode/src/dmMsg.c | 5 --- source/dnode/mgmt/vnode/inc/vmInt.h | 7 +-- source/dnode/mgmt/vnode/inc/vmMsg.h | 10 ++--- source/dnode/mgmt/vnode/inc/vmWorker.h | 2 + source/dnode/mgmt/vnode/src/vmMsg.c | 25 ++++++----- source/dnode/mgmt/vnode/src/vmWorker.c | 53 +++++++++++++++++++++++ 10 files changed, 87 insertions(+), 32 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f0718900c0..43a879fd8d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2269,6 +2269,13 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p #pragma pack(pop) +struct SRpcMsg; +struct SEpSet; +typedef int32_t (*PutToQueueFp)(void *pMgmt, struct SRpcMsg *pReq); +typedef int32_t (*SendReqFp)(void *pMgmt, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); +typedef int32_t (*SendMnodeReqFp)(void *pMgmt, struct SRpcMsg *rpcMsg); +typedef int32_t (*SendRspFp)(void *pMgmt, struct SRpcMsg *rpcMsg); + #ifdef __cplusplus } #endif diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index a62cd06410..d2bf206bb2 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -50,7 +50,7 @@ typedef struct { int32_t numOfDisks; } SDnodeOpt; -typedef enum { DND_EVENT_STOP = 1, DND_EVENT_RELOAD } EDndEvent; +typedef enum { DND_EVENT_START, DND_EVENT_STOP = 1, DND_EVENT_RELOAD } EDndEvent; /** * @brief Initialize and start the dnode. diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 4de2284dc3..9c8c7a519f 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -53,10 +53,6 @@ typedef struct { void *pNode; } SNodeMsg; -typedef int32_t (*PutToQueueFp)(void *pMgmt, struct SRpcMsg *pReq); -typedef int32_t (*SendReqFp)(void *pMgmt, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); -typedef int32_t (*SendMnodeReqFp)(void *pMgmt, struct SRpcMsg *rpcMsg); -typedef int32_t (*SendRspFp)(void *pMgmt, struct SRpcMsg *rpcMsg); typedef struct SRpcInit { uint16_t localPort; // local port char * label; // for debug purpose diff --git a/source/dnode/mgmt/container/src/dndNode.c b/source/dnode/mgmt/container/src/dndNode.c index 39f5da9675..6dc3397082 100644 --- a/source/dnode/mgmt/container/src/dndNode.c +++ b/source/dnode/mgmt/container/src/dndNode.c @@ -68,8 +68,8 @@ static int32_t dndInitMemory(SDnode *pDnode, const SDnodeOpt *pOption) { pDnode->numOfDisks = pOption->numOfDisks; pDnode->rebootTime = taosGetTimestampMs(); - if (pDnode->dataDir == NULL || pDnode->dataDir == NULL || pDnode->dataDir == NULL || pDnode->dataDir == NULL || - pDnode->dataDir == NULL) { + if (pDnode->dataDir == NULL || pDnode->localEp == NULL || pDnode->localFqdn == NULL || pDnode->firstEp == NULL || + pDnode->secondEp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } diff --git a/source/dnode/mgmt/dnode/src/dmMsg.c b/source/dnode/mgmt/dnode/src/dmMsg.c index 4a8ca025ab..79b84acc9b 100644 --- a/source/dnode/mgmt/dnode/src/dmMsg.c +++ b/source/dnode/mgmt/dnode/src/dmMsg.c @@ -123,11 +123,6 @@ void dmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, (NodeMsgFp)dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, (NodeMsgFp)dmProcessMgmtMsg); diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index 3378cd4669..c91b243ed7 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -46,6 +46,7 @@ typedef struct SVnodesMgmt { SMnode *pMnode; SDnode *pDnode; SMgmtWrapper *pWrapper; + SDnodeWorker mgmtWorker; } SVnodesMgmt; typedef struct { @@ -96,11 +97,7 @@ void vmReleaseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl); void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); -void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); - +int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/vnode/inc/vmMsg.h b/source/dnode/mgmt/vnode/inc/vmMsg.h index b17d2b24b9..58e1d9d0a8 100644 --- a/source/dnode/mgmt/vnode/inc/vmMsg.h +++ b/source/dnode/mgmt/vnode/inc/vmMsg.h @@ -24,11 +24,11 @@ extern "C" { void vmInitMsgHandles(SMgmtWrapper *pWrapper); -int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq); -int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq); -int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq); -int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq); -int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq); +int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); +int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); +int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); +int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); +int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/vnode/inc/vmWorker.h b/source/dnode/mgmt/vnode/inc/vmWorker.h index 7278c4c83e..26e1a8f0d4 100644 --- a/source/dnode/mgmt/vnode/inc/vmWorker.h +++ b/source/dnode/mgmt/vnode/inc/vmWorker.h @@ -35,6 +35,8 @@ int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); + + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 4888541f40..b4d51fed5c 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -55,7 +55,8 @@ static void vmGenerateWrapperCfg(SVnodesMgmt *pMgmt, SCreateVnodeReq *pCreate, S pCfg->vgVersion = pCreate->vgVersion; } -int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) { +int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { + SRpcMsg *pReq = &pMsg->rpcMsg; SCreateVnodeReq createReq = {0}; if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -113,7 +114,8 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) { return 0; } -int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) { +int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { + SRpcMsg *pReq = &pMsg->rpcMsg; SAlterVnodeReq alterReq = {0}; if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -154,7 +156,8 @@ int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) { return code; } -int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) { +int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { + SRpcMsg *pReq = &pMsg->rpcMsg; SDropVnodeReq dropReq = {0}; if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -184,7 +187,8 @@ int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) { return 0; } -int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) { +int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { + SRpcMsg *pReq = &pMsg->rpcMsg; SSyncVnodeReq syncReq = {0}; tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &syncReq); @@ -207,7 +211,8 @@ int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) { return 0; } -int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) { +int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { + SRpcMsg *pReq = &pMsg->rpcMsg; SCompactVnodeReq compatcReq = {0}; tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &compatcReq); @@ -264,9 +269,9 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessCreateVnodeReq); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessAlterVnodeReq); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessDropVnodeReq); - dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessSyncVnodeReq); - dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessCompactVnodeReq); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessMgmtMsg); } diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 3c42d7b094..b1dba6d000 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "vmWorker.h" +#include "vmMsg.h" static void vmProcessQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessQueryMsg(pVnode->pImpl, pMsg); } @@ -200,6 +201,46 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pQueryQ = NULL; } +static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { + int32_t code = -1; + tmsg_t msgType = pMsg->rpcMsg.msgType; + dTrace("msg:%p, will be processed", pMsg); + + switch (msgType) { + case TDMT_DND_CREATE_VNODE: + code = vmProcessCreateVnodeReq(pMgmt, pMsg); + break; + case TDMT_DND_ALTER_VNODE: + code = vmProcessAlterVnodeReq(pMgmt, pMsg); + break; + case TDMT_DND_DROP_VNODE: + code = vmProcessDropVnodeReq(pMgmt, pMsg); + break; + case TDMT_DND_SYNC_VNODE: + code = vmProcessSyncVnodeReq(pMgmt, pMsg); + break; + case TDMT_DND_COMPACT_VNODE: + code = vmProcessCompactVnodeReq(pMgmt, pMsg); + break; + default: + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + code = -1; + dError("RPC %p, dnode msg:%s not processed", pMsg->rpcMsg.handle, TMSG_INFO(msgType)); + break; + } + + if (msgType & 1u) { + if (code != 0) code = terrno; + SRpcMsg rsp = {.code = code, .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle}; + rpcSendResponse(&rsp); + } + + rpcFreeCont(pMsg->rpcMsg.pCont); + pMsg->rpcMsg.pCont = NULL; + taosFreeQitem(pMsg); + dTrace("msg:%p, is freed", pMsg); +} + int32_t vmStartWorker(SVnodesMgmt *pMgmt) { int32_t maxFetchThreads = 4; int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores); @@ -230,14 +271,26 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { pWPool->max = maxSyncThreads; if (tWWorkerInit(pWPool) != 0) return -1; + if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "vnode-mgmt", 1, 1, vmProcessMgmtQueue) != 0) { + dError("failed to start dnode mgmt worker since %s", terrstr()); + return -1; + } + dDebug("vnode workers is initialized"); return 0; } void vmStopWorker(SVnodesMgmt *pMgmt) { + dndCleanupWorker(&pMgmt->mgmtWorker); tFWorkerCleanup(&pMgmt->fetchPool); tQWorkerCleanup(&pMgmt->queryPool); tWWorkerCleanup(&pMgmt->writePool); tWWorkerCleanup(&pMgmt->syncPool); dDebug("vnode workers is closed"); } + +int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { + SDnodeWorker *pWorker = &pMgmt->mgmtWorker; + dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name); + return dndWriteMsgToWorker(pWorker, pMsg, 0); +} \ No newline at end of file -- GitLab