From cc0c0293ea06984683d38c6c6dfe42ccad156946 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 18 Mar 2022 16:39:51 +0800 Subject: [PATCH] vnode process task exec --- include/common/tmsgdef.h | 1 + source/dnode/mgmt/impl/src/dndSnode.c | 13 +++++-------- source/dnode/mgmt/impl/src/dndTransport.c | 3 +++ source/dnode/snode/src/snode.c | 6 +++--- source/dnode/vnode/inc/tq.h | 2 ++ source/dnode/vnode/src/tq/tq.c | 5 +++++ source/dnode/vnode/src/vnd/vnodeQuery.c | 6 +++--- 7 files changed, 22 insertions(+), 14 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index a596794b3d..73a78131dc 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -190,6 +190,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index ea06c8c751..7f3c706d7e 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -457,14 +457,11 @@ void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { } void dndProcessSnodeExecMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SSnode *pSnode = dndAcquireSnode(pDnode); - if (pSnode != NULL) { - int32_t workerType = dndGetSWTypeFromMsg(pMsg); - if (workerType == SND_WORKER_TYPE__SHARED) { - dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg); - } else { - dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg); - } + int32_t workerType = dndGetSWTypeFromMsg(pMsg); + if (workerType == SND_WORKER_TYPE__SHARED) { + dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg); + } else { + dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg); } } diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 617b6c0fc3..0e47df0cb6 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -155,6 +155,9 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TASK_DEPLOY)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TASK_EXEC)] = dndProcessVnodeFetchMsg; + // Requests handled by SNODE pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_DEPLOY)] = dndProcessSnodeMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_EXEC)] = dndProcessSnodeExecMsg; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 2ecaeb00e9..cde5347baa 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -84,9 +84,9 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) { } static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) { - SMsgHead *pHead = pMsg->pCont; - int32_t taskId = pHead->streamTaskId; - SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId); + SStreamExecMsgHead *pHead = pMsg->pCont; + int32_t taskId = pHead->streamTaskId; + SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId); if (pTask == NULL) { return -1; } diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 8d8ed2e427..6391eaffea 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -55,6 +55,8 @@ int tqCommit(STQ*); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg); +int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg); + int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8cdc250e8d..02fecb49b7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -433,3 +433,8 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { return 0; } + +int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) { + // + return 0; +} diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index e8bc6873ab..5caeb4d93a 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -24,9 +24,7 @@ int vnodeQueryOpen(SVnode *pVnode) { (putReqToQueryQFp)vnodePutReqToVQueryQ, (sendReqToDnodeFp)vnodeSendReqToDnode); } -void vnodeQueryClose(SVnode *pVnode) { - qWorkerDestroy((void **)&pVnode->pQuery); -} +void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in query queue is processing"); @@ -68,6 +66,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { return vnodeGetTableMeta(pVnode, pMsg); case TDMT_VND_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg); + case TDMT_VND_TASK_EXEC: + return tqProcessTaskExec(pVnode->pTq, pMsg); case TDMT_VND_QUERY_HEARTBEAT: return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); default: -- GitLab