From 42fd474beaa826bb7c70d638baa18c7c080bb8b8 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 18 Oct 2021 10:39:52 +0800 Subject: [PATCH] add vnodeprocess msg function --- include/common/taosmsg.h | 38 ++++++++++++++++++++++++- include/server/vnode/tq/tq.h | 5 ++-- source/server/dnode/src/dnodeTrans.c | 11 +++++-- source/server/vnode/inc/vnodeReadMsg.h | 2 ++ source/server/vnode/inc/vnodeWriteMsg.h | 8 +++++- source/server/vnode/src/vnodeMain.c | 36 ++++++++++++++--------- source/server/vnode/src/vnodeRead.c | 3 ++ source/server/vnode/src/vnodeWrite.c | 18 ++++++++++-- 8 files changed, 99 insertions(+), 22 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 78f91cca64..66a02f350e 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -42,7 +42,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) @@ -121,7 +123,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) #ifndef TAOS_MESSAGE_C - TSDB_MSG_TYPE_MAX // 105 + TSDB_MSG_TYPE_MAX // 147 #endif }; @@ -958,6 +960,40 @@ typedef struct { char reserved2[64]; } SStartupStep; +// mq related +typedef struct { + +} SMqConnectReq; + +typedef struct { + +} SMqConnectRsp; + +typedef struct { + +} SMqDisconnectReq; + +typedef struct { + +} SMqDisconnectRsp; + +typedef struct { + +} SMqAckReq; + +typedef struct { + +} SMqAckRsp; + +typedef struct { + +} SMqResetReq; + +typedef struct { + +} SMqResetRsp; +//mq related end + typedef struct { /* data */ } SSubmitReq; diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index ef6a34ffa3..6e56e8256f 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -26,8 +26,9 @@ typedef struct tmqMsgHead { int32_t headLen; int32_t msgVer; int64_t cgId; - int32_t topicLen; - char topic[]; + int64_t topicId; + int32_t checksum; + int32_t msgType; } tmqMsgHead; //TODO: put msgs into common diff --git a/source/server/dnode/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c index a4409674f1..1739283f34 100644 --- a/source/server/dnode/src/dnodeTrans.c +++ b/source/server/dnode/src/dnodeTrans.c @@ -97,6 +97,9 @@ int32_t dnodeInitServer() { tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; + /*tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessRead;*/ + SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = tsDnodeDnodePort; @@ -308,10 +311,12 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c } int32_t dnodeInitShell() { - tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; // the following message shall be treated as mnode write tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg; diff --git a/source/server/vnode/inc/vnodeReadMsg.h b/source/server/vnode/inc/vnodeReadMsg.h index a1efb729e1..1efc74d1af 100644 --- a/source/server/vnode/inc/vnodeReadMsg.h +++ b/source/server/vnode/inc/vnodeReadMsg.h @@ -36,6 +36,8 @@ typedef struct SReadMsg { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead); +int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead); +int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead); #ifdef __cplusplus } diff --git a/source/server/vnode/inc/vnodeWriteMsg.h b/source/server/vnode/inc/vnodeWriteMsg.h index 86cdba6946..9dbc4fe490 100644 --- a/source/server/vnode/inc/vnodeWriteMsg.h +++ b/source/server/vnode/inc/vnodeWriteMsg.h @@ -27,9 +27,15 @@ int32_t vnodeProcessDropTableReq(SVnode *pVnode, SDropTableReq *pReq, SDropTable int32_t vnodeProcessAlterTableReq(SVnode *pVnode, SAlterTableReq *pReq, SAlterTableRsp *pRsp); int32_t vnodeProcessDropStableReq(SVnode *pVnode, SDropStableReq *pReq, SDropStableRsp *pRsp); int32_t vnodeProcessUpdateTagValReq(SVnode *pVnode, SUpdateTagValReq *pReq, SUpdateTagValRsp *pRsp); +//mq related +int32_t vnodeProcessMqConnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp); +int32_t vnodeProcessMqDisconnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp); +int32_t vnodeProcessMqAckReq(SVnode* pVnode, SMqAckReq *pReq, SMqAckRsp *pRsp); +int32_t vnodeProcessMqResetReq(SVnode* pVnode, SMqResetReq *pReq, SMqResetRsp *pRsp); +//mq related end #ifdef __cplusplus } #endif -#endif /*_TD_VNODE_WRITE_MSG_H_*/ \ No newline at end of file +#endif /*_TD_VNODE_WRITE_MSG_H_*/ diff --git a/source/server/vnode/src/vnodeMain.c b/source/server/vnode/src/vnodeMain.c index d9c1a88d15..da1c1d7235 100644 --- a/source/server/vnode/src/vnodeMain.c +++ b/source/server/vnode/src/vnodeMain.c @@ -780,20 +780,30 @@ static void vnodeCleanupVnodes() { } static void vnodeInitMsgFp() { - tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg; + //mq related + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg; + //mq related end + tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; + //mq related + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg; + //mq related end } void vnodeProcessMsg(SRpcMsg *pMsg) { diff --git a/source/server/vnode/src/vnodeRead.c b/source/server/vnode/src/vnodeRead.c index 39b6983b7d..0bf907c419 100644 --- a/source/server/vnode/src/vnodeRead.c +++ b/source/server/vnode/src/vnodeRead.c @@ -141,6 +141,9 @@ void vnodeProcessReadMsg(SRpcMsg *pMsg) { static void vnodeInitReadMsgFp() { tsVread.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; tsVread.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; + + tsVread.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessTqQueryMsg; + tsVread.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessConsumeMsg; } static int32_t vnodeProcessReadStart(SVnode *pVnode, SReadMsg *pRead, int32_t qtype) { diff --git a/source/server/vnode/src/vnodeWrite.c b/source/server/vnode/src/vnodeWrite.c index 3c2634a2cf..c103460241 100644 --- a/source/server/vnode/src/vnodeWrite.c +++ b/source/server/vnode/src/vnodeWrite.c @@ -179,6 +179,20 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t case TSDB_MSG_TYPE_UPDATE_TAG_VAL: pWrite->code = vnodeProcessUpdateTagValReq(pVnode, (void*)pHead->cont, NULL); break; + //mq related + case TSDB_MSG_TYPE_MQ_CONNECT: + pWrite->code = vnodeProcessMqConnectReq(pVnode, (void*)pHead->cont, NULL); + break; + case TSDB_MSG_TYPE_MQ_DISCONNECT: + pWrite->code = vnodeProcessMqDisconnectReq(pVnode, (void*)pHead->cont, NULL); + break; + case TSDB_MSG_TYPE_MQ_ACK: + pWrite->code = vnodeProcessMqAckReq(pVnode, (void*)pHead->cont, NULL); + break; + case TSDB_MSG_TYPE_MQ_RESET: + pWrite->code = vnodeProcessMqResetReq(pVnode, (void*)pHead->cont, NULL); + break; + //mq related end default: pWrite->code = TSDB_CODE_VND_MSG_NOT_PROCESSED; break; @@ -186,7 +200,7 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t if (pWrite->code < 0) return false; - // update fync + // update fsync return (pWrite->code == 0 && msgType != TSDB_MSG_TYPE_SUBMIT); } @@ -233,4 +247,4 @@ void vnodeCleanupWrite() { taos_queue vnodeAllocWriteQueue(SVnode *pVnode) { return tWriteWorkerAllocQueue(&tsVwrite.pool, pVnode); } -void vnodeFreeWriteQueue(taos_queue pQueue) { tWriteWorkerFreeQueue(&tsVwrite.pool, pQueue); } \ No newline at end of file +void vnodeFreeWriteQueue(taos_queue pQueue) { tWriteWorkerFreeQueue(&tsVwrite.pool, pQueue); } -- GitLab