提交 d4d1c0a9 编写于 作者: L Liu Jicong

add stream exec head

上级 552db887
...@@ -197,6 +197,11 @@ typedef struct { ...@@ -197,6 +197,11 @@ typedef struct {
}; };
} SMsgHead; } SMsgHead;
typedef struct {
int32_t workerType;
int32_t streamTaskId;
} SStreamExecMsgHead;
// Submit message for one table // Submit message for one table
typedef struct SSubmitBlk { typedef struct SSubmitBlk {
int64_t uid; // table unique id int64_t uid; // table unique id
...@@ -2310,7 +2315,7 @@ typedef struct { ...@@ -2310,7 +2315,7 @@ typedef struct {
} SStreamTaskDeployRsp; } SStreamTaskDeployRsp;
typedef struct { typedef struct {
SMsgHead head; SStreamExecMsgHead head;
// TODO: other info needed by task // TODO: other info needed by task
} SStreamTaskExecReq; } SStreamTaskExecReq;
......
...@@ -448,6 +448,11 @@ typedef struct { ...@@ -448,6 +448,11 @@ typedef struct {
#define SND_UNIQUE_THREAD_NUM 2 #define SND_UNIQUE_THREAD_NUM 2
#define SND_SHARED_THREAD_NUM 2 #define SND_SHARED_THREAD_NUM 2
enum {
SND_WORKER_TYPE__SHARED = 1,
SND_WORKER_TYPE__UNIQUE,
};
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -24,10 +24,15 @@ extern "C" { ...@@ -24,10 +24,15 @@ extern "C" {
int32_t dndInitSnode(SDnode *pDnode); int32_t dndInitSnode(SDnode *pDnode);
void dndCleanupSnode(SDnode *pDnode); void dndCleanupSnode(SDnode *pDnode);
void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); // void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessSnodeExecMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -382,6 +382,12 @@ static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -382,6 +382,12 @@ static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static FORCE_INLINE int32_t dndGetSWTypeFromMsg(SRpcMsg *pMsg) {
SStreamExecMsgHead *pHead = pMsg->pCont;
pHead->workerType = htonl(pHead->workerType);
return pHead->workerType;
}
static FORCE_INLINE int32_t dndGetSWIdFromMsg(SRpcMsg *pMsg) { static FORCE_INLINE int32_t dndGetSWIdFromMsg(SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont; SMsgHead *pHead = pMsg->pCont;
pHead->streamTaskId = htonl(pHead->streamTaskId); pHead->streamTaskId = htonl(pHead->streamTaskId);
...@@ -450,6 +456,18 @@ void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -450,6 +456,18 @@ void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndWriteSnodeMsgToMgmtWorker(pDnode, pMsg); dndWriteSnodeMsgToMgmtWorker(pDnode, pMsg);
} }
void dndProcessSnodeExecMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SSnode *pSnode = dndAcquireSnode(pDnode);
if (pSnode != NULL) {
int32_t workerType = dndGetSWTypeFromMsg(pMsg);
if (workerType == SND_WORKER_TYPE__SHARED) {
dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg);
} else {
dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg);
}
}
}
void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg); dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg);
} }
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "dndTransport.h" #include "dndTransport.h"
#include "dndMgmt.h" #include "dndMgmt.h"
#include "dndMnode.h" #include "dndMnode.h"
#include "dndSnode.h"
#include "dndVnodes.h" #include "dndVnodes.h"
#define INTERNAL_USER "_dnd" #define INTERNAL_USER "_dnd"
...@@ -153,10 +154,14 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -153,10 +154,14 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg;
// Requests handled by SNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_DEPLOY)] = dndProcessSnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_EXEC)] = dndProcessSnodeExecMsg;
} }
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
SDnode * pDnode = parent; SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->tmgmt; STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pRsp->msgType; tmsg_t msgType = pRsp->msgType;
...@@ -219,7 +224,7 @@ static void dndCleanupClient(SDnode *pDnode) { ...@@ -219,7 +224,7 @@ static void dndCleanupClient(SDnode *pDnode) {
} }
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
SDnode * pDnode = param; SDnode *pDnode = param;
STransMgmt *pMgmt = &pDnode->tmgmt; STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pReq->msgType; tmsg_t msgType = pReq->msgType;
...@@ -313,7 +318,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char ...@@ -313,7 +318,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
SAuthReq authReq = {0}; SAuthReq authReq = {0};
tstrncpy(authReq.user, user, TSDB_USER_LEN); tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
void * pReq = rpcMallocCont(contLen); void *pReq = rpcMallocCont(contLen);
tSerializeSAuthReq(pReq, contLen, &authReq); tSerializeSAuthReq(pReq, contLen, &authReq);
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册