提交 1c05650c 编写于 作者: S Shengliang Guan

shm

上级 cbef7465
...@@ -49,6 +49,7 @@ void smStopWorker(SSnodeMgmt *pMgmt); ...@@ -49,6 +49,7 @@ void smStopWorker(SSnodeMgmt *pMgmt);
int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessExecMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -54,4 +54,8 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -54,4 +54,8 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
} }
void smInitMsgHandles(SMgmtWrapper *pWrapper) {} void smInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by SNODE
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, (NodeMsgFp)smProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, (NodeMsgFp)smProcessExecMsg);
}
...@@ -86,6 +86,12 @@ static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) { ...@@ -86,6 +86,12 @@ static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) {
return pHead->streamTaskId % SND_UNIQUE_THREAD_NUM; return pHead->streamTaskId % SND_UNIQUE_THREAD_NUM;
} }
static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
SStreamExecMsgHead *pHead = pMsg->pCont;
pHead->workerType = htonl(pHead->workerType);
return pHead->workerType;
}
int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
if (pWorker == NULL) { if (pWorker == NULL) {
...@@ -115,3 +121,12 @@ int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -115,3 +121,12 @@ int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg); return dndWriteMsgToWorker(pWorker, pMsg);
} }
int32_t smProcessExecMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t workerType = smGetSWTypeFromMsg(&pMsg->rpcMsg);
if (workerType == SND_WORKER_TYPE__SHARED) {
return smProcessSharedMsg(pMgmt, pMsg);
} else {
return smProcessUniqueMsg(pMgmt, pMsg);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册