From 69a4417a0c5b971b8a29494847d865c66ae8e742 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 14 Oct 2021 16:50:07 +0800 Subject: [PATCH] add some msg for tq (#8241) --- include/common/taosmsg.h | 6 +++- include/server/vnode/tq/tq.h | 54 ++++++++++++++++++++++++++++-- source/server/vnode/tq/inc/tqInt.h | 7 ++-- source/server/vnode/tq/src/tq.c | 2 +- 4 files changed, 62 insertions(+), 7 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 8f89df40d0..78f91cca64 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -41,6 +41,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) @@ -113,7 +117,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" ) // message for topic TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TP, "create-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" ) +//TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) #ifndef TAOS_MESSAGE_C diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index eb9c57c581..ef6a34ffa3 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -22,6 +22,56 @@ extern "C" { #endif +typedef struct tmqMsgHead { + int32_t headLen; + int32_t msgVer; + int64_t cgId; + int32_t topicLen; + char topic[]; +} tmqMsgHead; + +//TODO: put msgs into common +typedef struct tmqConnectReq { + tmqMsgHead head; + +} tmqConnectReq; + +typedef struct tmqConnectResp { + +} tmqConnectResp; + +typedef struct tmqDisconnectReq { + +} tmqDisconnectReq; + +typedef struct tmqDisconnectResp { + +} tmqDiconnectResp; + +typedef struct tmqConsumeReq { + +} tmqConsumeReq; + +typedef struct tmqConsumeResp { + +} tmqConsumeResp; + +typedef struct tmqSubscribeReq { + +} tmqSubscribeReq; + +typedef struct tmqSubscribeResp { + +} tmqSubscribeResp; + +typedef struct tmqHeartbeatReq { + +} tmqHeartbeatReq; + +typedef struct tmqHeartbeatResp { + +} tmqHeartbeatResp; + typedef struct tqTopicVhandle { //name // @@ -29,7 +79,7 @@ typedef struct tqTopicVhandle { // //callback for mnode // -} tqTopic; +} tqTopicVhandle; typedef struct STQ { //the set for topics @@ -50,7 +100,7 @@ int tqPushMsg(STQ*, void* msg, int64_t version); int tqCommit(STQ*); //void* will be replace by a msg type -int tqHandleMsg(STQ*, void* msg); +int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg); #ifdef __cplusplus } diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index c42bcfef43..cba9075fe9 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -26,14 +26,15 @@ extern "C" { typedef struct tqBufferItem { int64_t offset; - void *content; + void* executor; + void* content; } tqBufferItem; typedef struct tqGroupHandle { - char* topic; - void* ahandle; + char* topic; //c style, end with '\0' int64_t cgId; + void* ahandle; int64_t consumeOffset; int32_t head; int32_t tail; diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index 2ef2a4b6ea..7733ac29b5 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -77,7 +77,7 @@ int tqCommit(STQ* pTq) { return 0; } -int tqHandleMsg(STQ* pTq, void*msg) { +int tqHandleConsumeMsg(STQ* pTq, tmqConsumeReq* msg) { //parse msg and extract topic and cgId //lookup handle //confirm message and send to consumer -- GitLab