未验证 提交 69a4417a 编写于 作者: L Liu Jicong 提交者: GitHub

add some msg for tq (#8241)

上级 a1fbaf30
...@@ -41,6 +41,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" ) ...@@ -41,6 +41,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" )
...@@ -113,7 +117,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" ) ...@@ -113,7 +117,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
// message for topic // message for topic
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TP, "create-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TP, "create-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" ) //TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" )
#ifndef TAOS_MESSAGE_C #ifndef TAOS_MESSAGE_C
......
...@@ -22,6 +22,56 @@ ...@@ -22,6 +22,56 @@
extern "C" { extern "C" {
#endif #endif
typedef struct tmqMsgHead {
int32_t headLen;
int32_t msgVer;
int64_t cgId;
int32_t topicLen;
char topic[];
} tmqMsgHead;
//TODO: put msgs into common
typedef struct tmqConnectReq {
tmqMsgHead head;
} tmqConnectReq;
typedef struct tmqConnectResp {
} tmqConnectResp;
typedef struct tmqDisconnectReq {
} tmqDisconnectReq;
typedef struct tmqDisconnectResp {
} tmqDiconnectResp;
typedef struct tmqConsumeReq {
} tmqConsumeReq;
typedef struct tmqConsumeResp {
} tmqConsumeResp;
typedef struct tmqSubscribeReq {
} tmqSubscribeReq;
typedef struct tmqSubscribeResp {
} tmqSubscribeResp;
typedef struct tmqHeartbeatReq {
} tmqHeartbeatReq;
typedef struct tmqHeartbeatResp {
} tmqHeartbeatResp;
typedef struct tqTopicVhandle { typedef struct tqTopicVhandle {
//name //name
// //
...@@ -29,7 +79,7 @@ typedef struct tqTopicVhandle { ...@@ -29,7 +79,7 @@ typedef struct tqTopicVhandle {
// //
//callback for mnode //callback for mnode
// //
} tqTopic; } tqTopicVhandle;
typedef struct STQ { typedef struct STQ {
//the set for topics //the set for topics
...@@ -50,7 +100,7 @@ int tqPushMsg(STQ*, void* msg, int64_t version); ...@@ -50,7 +100,7 @@ int tqPushMsg(STQ*, void* msg, int64_t version);
int tqCommit(STQ*); int tqCommit(STQ*);
//void* will be replace by a msg type //void* will be replace by a msg type
int tqHandleMsg(STQ*, void* msg); int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -26,14 +26,15 @@ extern "C" { ...@@ -26,14 +26,15 @@ extern "C" {
typedef struct tqBufferItem { typedef struct tqBufferItem {
int64_t offset; int64_t offset;
void *content; void* executor;
void* content;
} tqBufferItem; } tqBufferItem;
typedef struct tqGroupHandle { typedef struct tqGroupHandle {
char* topic; char* topic; //c style, end with '\0'
void* ahandle;
int64_t cgId; int64_t cgId;
void* ahandle;
int64_t consumeOffset; int64_t consumeOffset;
int32_t head; int32_t head;
int32_t tail; int32_t tail;
......
...@@ -77,7 +77,7 @@ int tqCommit(STQ* pTq) { ...@@ -77,7 +77,7 @@ int tqCommit(STQ* pTq) {
return 0; return 0;
} }
int tqHandleMsg(STQ* pTq, void*msg) { int tqHandleConsumeMsg(STQ* pTq, tmqConsumeReq* msg) {
//parse msg and extract topic and cgId //parse msg and extract topic and cgId
//lookup handle //lookup handle
//confirm message and send to consumer //confirm message and send to consumer
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册