From 1c05650cdd6c246d7fb406ba7e3dc8b37418198e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 18 Mar 2022 17:15:16 +0800 Subject: [PATCH] shm --- source/dnode/mgmt/snode/inc/smInt.h | 1 + source/dnode/mgmt/snode/src/smMsg.c | 6 +++++- source/dnode/mgmt/snode/src/smWorker.c | 15 +++++++++++++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/source/dnode/mgmt/snode/inc/smInt.h b/source/dnode/mgmt/snode/inc/smInt.h index 54bdecb035..3def27b832 100644 --- a/source/dnode/mgmt/snode/inc/smInt.h +++ b/source/dnode/mgmt/snode/inc/smInt.h @@ -49,6 +49,7 @@ void smStopWorker(SSnodeMgmt *pMgmt); int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t smProcessExecMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/snode/src/smMsg.c b/source/dnode/mgmt/snode/src/smMsg.c index ad325f336f..1bff5597bf 100644 --- a/source/dnode/mgmt/snode/src/smMsg.c +++ b/source/dnode/mgmt/snode/src/smMsg.c @@ -54,4 +54,8 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } } -void smInitMsgHandles(SMgmtWrapper *pWrapper) {} +void smInitMsgHandles(SMgmtWrapper *pWrapper) { + // Requests handled by SNODE + dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, (NodeMsgFp)smProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, (NodeMsgFp)smProcessExecMsg); +} diff --git a/source/dnode/mgmt/snode/src/smWorker.c b/source/dnode/mgmt/snode/src/smWorker.c index 1bf07d2232..0c6f13c747 100644 --- a/source/dnode/mgmt/snode/src/smWorker.c +++ b/source/dnode/mgmt/snode/src/smWorker.c @@ -86,6 +86,12 @@ static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) { return pHead->streamTaskId % SND_UNIQUE_THREAD_NUM; } +static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) { + SStreamExecMsgHead *pHead = pMsg->pCont; + pHead->workerType = htonl(pHead->workerType); + return pHead->workerType; +} + int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); if (pWorker == NULL) { @@ -115,3 +121,12 @@ int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); return dndWriteMsgToWorker(pWorker, pMsg); } + +int32_t smProcessExecMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { + int32_t workerType = smGetSWTypeFromMsg(&pMsg->rpcMsg); + if (workerType == SND_WORKER_TYPE__SHARED) { + return smProcessSharedMsg(pMgmt, pMsg); + } else { + return smProcessUniqueMsg(pMgmt, pMsg); + } +} -- GitLab