提交 cc0c0293 编写于 作者: L Liu Jicong

vnode process task exec

上级 d4d1c0a9
...@@ -190,6 +190,7 @@ enum { ...@@ -190,6 +190,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
......
...@@ -457,15 +457,12 @@ void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -457,15 +457,12 @@ void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
} }
void dndProcessSnodeExecMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessSnodeExecMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SSnode *pSnode = dndAcquireSnode(pDnode);
if (pSnode != NULL) {
int32_t workerType = dndGetSWTypeFromMsg(pMsg); int32_t workerType = dndGetSWTypeFromMsg(pMsg);
if (workerType == SND_WORKER_TYPE__SHARED) { if (workerType == SND_WORKER_TYPE__SHARED) {
dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg); dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg);
} else { } else {
dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg); dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg);
} }
}
} }
void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
......
...@@ -155,6 +155,9 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -155,6 +155,9 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TASK_DEPLOY)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TASK_EXEC)] = dndProcessVnodeFetchMsg;
// Requests handled by SNODE // Requests handled by SNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_DEPLOY)] = dndProcessSnodeMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_DEPLOY)] = dndProcessSnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_EXEC)] = dndProcessSnodeExecMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_EXEC)] = dndProcessSnodeExecMsg;
......
...@@ -84,7 +84,7 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) { ...@@ -84,7 +84,7 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) {
} }
static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) { static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont; SStreamExecMsgHead *pHead = pMsg->pCont;
int32_t taskId = pHead->streamTaskId; int32_t taskId = pHead->streamTaskId;
SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId); SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId);
if (pTask == NULL) { if (pTask == NULL) {
......
...@@ -55,6 +55,8 @@ int tqCommit(STQ*); ...@@ -55,6 +55,8 @@ int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -433,3 +433,8 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -433,3 +433,8 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
return 0; return 0;
} }
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) {
//
return 0;
}
...@@ -24,9 +24,7 @@ int vnodeQueryOpen(SVnode *pVnode) { ...@@ -24,9 +24,7 @@ int vnodeQueryOpen(SVnode *pVnode) {
(putReqToQueryQFp)vnodePutReqToVQueryQ, (sendReqToDnodeFp)vnodeSendReqToDnode); (putReqToQueryQFp)vnodePutReqToVQueryQ, (sendReqToDnodeFp)vnodeSendReqToDnode);
} }
void vnodeQueryClose(SVnode *pVnode) { void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
qWorkerDestroy((void **)&pVnode->pQuery);
}
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in query queue is processing"); vTrace("message in query queue is processing");
...@@ -68,6 +66,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -68,6 +66,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return vnodeGetTableMeta(pVnode, pMsg); return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg); return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_EXEC:
return tqProcessTaskExec(pVnode->pTq, pMsg);
case TDMT_VND_QUERY_HEARTBEAT: case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
default: default:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册