diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bb03e96df4220069934eee20a4d74a8fb0b505c2..1621b175dddb3bb8c22db10531fa42f35703f832 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -197,6 +197,11 @@ typedef struct { }; } SMsgHead; +typedef struct { + int32_t workerType; + int32_t streamTaskId; +} SStreamExecMsgHead; + // Submit message for one table typedef struct SSubmitBlk { int64_t uid; // table unique id @@ -1891,9 +1896,9 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) return buf; } typedef struct { - int8_t version; // for compatibility(default 0) - int8_t intervalUnit; // MACRO: TIME_UNIT_XXX - int8_t slidingUnit; // MACRO: TIME_UNIT_XXX + int8_t version; // for compatibility(default 0) + int8_t intervalUnit; // MACRO: TIME_UNIT_XXX + int8_t slidingUnit; // MACRO: TIME_UNIT_XXX char indexName[TSDB_INDEX_NAME_LEN]; char timezone[TD_TIMEZONE_LEN]; // sma data expired if timezone changes. int32_t exprLen; @@ -1901,7 +1906,7 @@ typedef struct { int64_t indexUid; tb_uid_t tableUid; // super/child/common table uid int64_t interval; - int64_t offset; // use unit by precision of DB + int64_t offset; // use unit by precision of DB int64_t sliding; char* expr; // sma expression char* tagsFilter; @@ -2310,7 +2315,7 @@ typedef struct { } SStreamTaskDeployRsp; typedef struct { - SMsgHead head; + SStreamExecMsgHead head; // TODO: other info needed by task } SStreamTaskExecReq; diff --git a/include/util/tdef.h b/include/util/tdef.h index 41a61ceb55bbb2edb72315297cb005ce55394e96..f6c4ae8d42323a450e5ac5cb720a37503aa2b25d 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -448,6 +448,11 @@ typedef struct { #define SND_UNIQUE_THREAD_NUM 2 #define SND_SHARED_THREAD_NUM 2 +enum { + SND_WORKER_TYPE__SHARED = 1, + SND_WORKER_TYPE__UNIQUE, +}; + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/impl/inc/dndSnode.h b/source/dnode/mgmt/impl/inc/dndSnode.h index b21e9191e8fbedc064f7b9823cec1e68acb70d90..f72d2a137a304d5e9058e84d140f077a15a42c9f 100644 --- a/source/dnode/mgmt/impl/inc/dndSnode.h +++ b/source/dnode/mgmt/impl/inc/dndSnode.h @@ -24,12 +24,17 @@ extern "C" { int32_t dndInitSnode(SDnode *pDnode); void dndCleanupSnode(SDnode *pDnode); -void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +// void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +void dndProcessSnodeExecMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); + #ifdef __cplusplus } #endif -#endif /*_TD_DND_SNODE_H_*/ \ No newline at end of file +#endif /*_TD_DND_SNODE_H_*/ diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index 8667952f2c71a5febe171e0eae6af4e61ad725f4..ea06c8c7519e48b881faa0f2e5aca8e64072dad3 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -382,6 +382,12 @@ static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } +static FORCE_INLINE int32_t dndGetSWTypeFromMsg(SRpcMsg *pMsg) { + SStreamExecMsgHead *pHead = pMsg->pCont; + pHead->workerType = htonl(pHead->workerType); + return pHead->workerType; +} + static FORCE_INLINE int32_t dndGetSWIdFromMsg(SRpcMsg *pMsg) { SMsgHead *pHead = pMsg->pCont; pHead->streamTaskId = htonl(pHead->streamTaskId); @@ -450,6 +456,18 @@ void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { dndWriteSnodeMsgToMgmtWorker(pDnode, pMsg); } +void dndProcessSnodeExecMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode != NULL) { + int32_t workerType = dndGetSWTypeFromMsg(pMsg); + if (workerType == SND_WORKER_TYPE__SHARED) { + dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg); + } else { + dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg); + } + } +} + void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg); } diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 15db36477d27f845bf4ca3bb1d03d9050c911f38..617b6c0fc374678d29f16c5f74e1876c44aa4d01 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -23,10 +23,11 @@ #include "dndTransport.h" #include "dndMgmt.h" #include "dndMnode.h" +#include "dndSnode.h" #include "dndVnodes.h" -#define INTERNAL_USER "_dnd" -#define INTERNAL_CKEY "_key" +#define INTERNAL_USER "_dnd" +#define INTERNAL_CKEY "_key" #define INTERNAL_SECRET "_pwd" static void dndInitMsgFp(STransMgmt *pMgmt) { @@ -153,10 +154,14 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg; + + // Requests handled by SNODE + pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_DEPLOY)] = dndProcessSnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_EXEC)] = dndProcessSnodeExecMsg; } static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { - SDnode * pDnode = parent; + SDnode *pDnode = parent; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pRsp->msgType; @@ -219,7 +224,7 @@ static void dndCleanupClient(SDnode *pDnode) { } static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { - SDnode * pDnode = param; + SDnode *pDnode = param; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pReq->msgType; @@ -313,7 +318,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char SAuthReq authReq = {0}; tstrncpy(authReq.user, user, TSDB_USER_LEN); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); - void * pReq = rpcMallocCont(contLen); + void *pReq = rpcMallocCont(contLen); tSerializeSAuthReq(pReq, contLen, &authReq); SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};