未验证 提交 0acfddf3 编写于 作者: L Liu Jicong 提交者: GitHub

merge tq related (#8415)

add tq data structure
上级 f65be084
......@@ -24,9 +24,10 @@ extern "C" {
typedef struct tmqMsgHead {
int32_t headLen;
int32_t msgVer;
int32_t protoVer;
int64_t cgId;
int64_t topicId;
int64_t clientId;
int32_t checksum;
int32_t msgType;
} tmqMsgHead;
......@@ -34,35 +35,43 @@ typedef struct tmqMsgHead {
//TODO: put msgs into common
typedef struct tmqConnectReq {
tmqMsgHead head;
} tmqConnectReq;
typedef struct tmqConnectResp {
tmqMsgHead head;
int8_t status;
} tmqConnectResp;
typedef struct tmqDisconnectReq {
tmqMsgHead head;
} tmqDisconnectReq;
typedef struct tmqDisconnectResp {
tmqMsgHead head;
int8_t status;
} tmqDiconnectResp;
typedef struct tmqConsumeReq {
tmqMsgHead head;
int64_t commitOffset;
} tmqConsumeReq;
typedef struct tmqConsumeResp {
tmqMsgHead head;
char content[];
} tmqConsumeResp;
typedef struct tmqSubscribeReq {
//
typedef struct tmqMnodeSubscribeReq {
tmqMsgHead head;
int64_t topicLen;
char topic[];
} tmqSubscribeReq;
typedef struct tmqSubscribeResp {
typedef struct tmqMnodeSubscribeResp {
tmqMsgHead head;
int64_t vgId;
char ep[]; //TSDB_EP_LEN
} tmqSubscribeResp;
typedef struct tmqHeartbeatReq {
......@@ -92,6 +101,24 @@ typedef struct STQ {
//value=consumeOffset: int64_t
} STQ;
#define TQ_BUFFER_SIZE 8
typedef struct tqBufferItem {
int64_t offset;
void* executor;
void* content;
} tqBufferItem;
typedef struct tqGroupHandle {
char* topic; //c style, end with '\0'
int64_t cgId;
void* ahandle;
int64_t consumeOffset;
int32_t head;
int32_t tail;
tqBufferItem buffer[TQ_BUFFER_SIZE];
} tqGroupHandle;
//init in each vnode
STQ* tqInit(void* ref_func(void*), void* unref_func(void*));
void tqCleanUp(STQ*);
......@@ -103,6 +130,17 @@ int tqCommit(STQ*);
//void* will be replace by a msg type
int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg);
tqGroupHandle* tqFindGHandleBycId(STQ*, int64_t cId);
int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqMoveOffsetToNext(tqGroupHandle*);
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
int tqFetchMsg(tqGroupHandle*, void*);
int tqRegisterContext(tqGroupHandle*, void*);
int tqLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query);
int tqSendLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query);
#ifdef __cplusplus
}
#endif
......
......@@ -26,6 +26,7 @@
#include "dnodeStatus.h"
#include "mnode.h"
#include "vnode.h"
#include "mnode.h"
typedef void (*RpcMsgFp)(SRpcMsg *pMsg);
......@@ -144,7 +145,6 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
RpcMsgFp fp = tsTrans.peerMsgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg);
} else {
dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
......
......@@ -218,10 +218,33 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) {
}
//mq related
int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead){
int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) {
//parse message and optionally move offset
void* pMsg = pRead->pCont;
tmqConsumeReq *pConsumeMsg = (tmqConsumeReq*) pMsg;
tmqMsgHead msgHead = pConsumeMsg->head;
//extract head
STQ *pTq = pVnode->pTQ;
tqGroupHandle *pHandle = tqFindGHandleBycId(pTq, msgHead.clientId);
//return msg if offset not moved
if(pConsumeMsg->commitOffset == pHandle->consumeOffset) {
//return msg
return 0;
}
//or move offset
tqMoveOffsetToNext(pHandle);
//fetch or register context
tqFetchMsg(pHandle, pRead);
//judge mode, tail read or catch up read
//launch new query
return 0;
}
int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead) {
//get operator tree from tq data structure
//execute operator tree
//put data into ringbuffer
//unref memory
return 0;
}
//mq related end
......
......@@ -18,37 +18,19 @@
#include "tq.h"
#define TQ_BUFFER_SIZE 8
#ifdef __cplusplus
extern "C" {
#endif
typedef struct tqBufferItem {
int64_t offset;
void* executor;
void* content;
} tqBufferItem;
typedef struct tqGroupHandle {
char* topic; //c style, end with '\0'
int64_t cgId;
void* ahandle;
int64_t consumeOffset;
int32_t head;
int32_t tail;
tqBufferItem buffer[TQ_BUFFER_SIZE];
} tqGroupHandle;
//create persistent storage for meta info such as consuming offset
//return value > 0: cgId
//return value <= 0: error code
int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqGroupHandle** handle);
//create ring buffer in memory and load consuming offset
int tqOpenTCGroup(STQ*, const char* topic, int cgId);
//int tqOpenTCGroup(STQ*, const char* topic, int cgId);
//destroy ring buffer and persist consuming offset
int tqCloseTCGroup(STQ*, const char* topic, int cgId);
//int tqCloseTCGroup(STQ*, const char* topic, int cgId);
//delete persistent storage for meta info
int tqDropTCGroup(STQ*, const char* topic, int cgId);
......
......@@ -22,7 +22,7 @@
//
//handle management message
static tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) {
tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) {
//look in memory
//
//not found, try to restore from disk
......@@ -56,9 +56,9 @@ int tqOpenTGroup(STQ* pTq, const char* topic, int cgId) {
return 0;
}
int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) {
tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);
return tqCommitTCGroup(handle);
/*int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) {*/
/*tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);*/
/*return tqCommitTCGroup(handle);*/
}
int tqDropTCGroup(STQ* pTq, const char* topic, int cgId) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册