diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index a596794b3d9ac493f775bd2d6f6bb74f12075036..73a78131dc1cb6d81e85c3278328866d05ecd62a 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -190,6 +190,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index ea06c8c7519e48b881faa0f2e5aca8e64072dad3..7f3c706d7e1d8c6e474a7be426951dd9eb58ae3c 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -457,14 +457,11 @@ void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { } void dndProcessSnodeExecMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SSnode *pSnode = dndAcquireSnode(pDnode); - if (pSnode != NULL) { - int32_t workerType = dndGetSWTypeFromMsg(pMsg); - if (workerType == SND_WORKER_TYPE__SHARED) { - dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg); - } else { - dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg); - } + int32_t workerType = dndGetSWTypeFromMsg(pMsg); + if (workerType == SND_WORKER_TYPE__SHARED) { + dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg); + } else { + dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg); } } diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 617b6c0fc374678d29f16c5f74e1876c44aa4d01..0e47df0cb6fbb1507f7d8446accdc73a213b8bf9 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -155,6 +155,9 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TASK_DEPLOY)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TASK_EXEC)] = dndProcessVnodeFetchMsg; + // Requests handled by SNODE pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_DEPLOY)] = dndProcessSnodeMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_EXEC)] = dndProcessSnodeExecMsg; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 2ecaeb00e90ee0d9e07e69c4e42abac360529f94..cde5347baa5fa47a62407fa41a15d9db71cfd996 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -84,9 +84,9 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) { } static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) { - SMsgHead *pHead = pMsg->pCont; - int32_t taskId = pHead->streamTaskId; - SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId); + SStreamExecMsgHead *pHead = pMsg->pCont; + int32_t taskId = pHead->streamTaskId; + SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId); if (pTask == NULL) { return -1; } diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 8d8ed2e427087798c7e6da03be47743cec526bfa..6391eaffea70088b389a0e24834e151f8d075dd4 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -55,6 +55,8 @@ int tqCommit(STQ*); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg); +int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg); + int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8cdc250e8d5bbb9cbaac311a38ac2001144572a1..02fecb49b77c1fbf8257e6b2197a698edeb2c379 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -433,3 +433,8 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { return 0; } + +int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) { + // + return 0; +} diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index e8bc6873ab0a34ed56d1455857be81c26a2ade54..5caeb4d93a5e9240603b4d3c773f8738dc54b84a 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -24,9 +24,7 @@ int vnodeQueryOpen(SVnode *pVnode) { (putReqToQueryQFp)vnodePutReqToVQueryQ, (sendReqToDnodeFp)vnodeSendReqToDnode); } -void vnodeQueryClose(SVnode *pVnode) { - qWorkerDestroy((void **)&pVnode->pQuery); -} +void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in query queue is processing"); @@ -68,6 +66,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { return vnodeGetTableMeta(pVnode, pMsg); case TDMT_VND_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg); + case TDMT_VND_TASK_EXEC: + return tqProcessTaskExec(pVnode->pTq, pMsg); case TDMT_VND_QUERY_HEARTBEAT: return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); default: