From a1fbaf30ab6491ca27dfdd46cc109ad5bc9da113 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 13 Oct 2021 17:53:10 +0800 Subject: [PATCH] tq data structure defined --- .gitignore | 4 ++- include/server/vnode/tq/tq.h | 22 ++++++++++-- source/server/vnode/tq/inc/tqInt.h | 27 +++++++++++---- source/server/vnode/tq/src/tq.c | 55 +++++++++++++++++++++++++++++- 4 files changed, 98 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 5141448ee0..0b98a1b161 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ build/ +compile_commands.json +.cache .ycm_extra_conf.py .vscode/ .idea/ @@ -96,4 +98,4 @@ tramp TAGS deps/* -!deps/CMakeLists.txt \ No newline at end of file +!deps/CMakeLists.txt diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index dd355c8381..eb9c57c581 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -22,8 +22,26 @@ extern "C" { #endif -typedef struct STQ STQ; - +typedef struct tqTopicVhandle { + //name + // + //executor for filter + // + //callback for mnode + // +} tqTopic; + +typedef struct STQ { + //the set for topics + //key=topicName: str + //value=tqTopicVhandle + + //a map + //key= + //value=consumeOffset: int64_t +} STQ; + +//init in each vnode STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); void tqCleanUp(STQ*); diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index a51f0b03af..c42bcfef43 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -18,23 +18,38 @@ #include "tq.h" +#define TQ_BUFFER_SIZE 8 + #ifdef __cplusplus extern "C" { #endif -//implement the array index -//implement the ring buffer +typedef struct tqBufferItem { + int64_t offset; + void *content; +} tqBufferItem; + + +typedef struct tqGroupHandle { + char* topic; + void* ahandle; + int64_t cgId; + int64_t consumeOffset; + int32_t head; + int32_t tail; + tqBufferItem buffer[TQ_BUFFER_SIZE]; +} tqGroupHandle; //create persistent storage for meta info such as consuming offset //return value > 0: cgId //return value <= 0: error code -int tqCreateGroup(STQ*); +int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqGroupHandle** handle); //create ring buffer in memory and load consuming offset -int tqOpenGroup(STQ*, int cgId); +int tqOpenTCGroup(STQ*, const char* topic, int cgId); //destroy ring buffer and persist consuming offset -int tqCloseGroup(STQ*, int cgId); +int tqCloseTCGroup(STQ*, const char* topic, int cgId); //delete persistent storage for meta info -int tqDropGroup(STQ*, int cgId); +int tqDropTCGroup(STQ*, const char* topic, int cgId); #ifdef __cplusplus } diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index 3255f3fb3a..2ef2a4b6ea 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -22,12 +22,65 @@ // //handle management message +static tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) { + //look in memory + // + //not found, try to restore from disk + // + //still not found + return NULL; +} + +static int tqCommitTCGroup(tqGroupHandle* handle) { + //persist into disk + return 0; +} + +int tqCreateTCGroup(STQ *pTq, const char* topic, int cgId, tqGroupHandle** handle) { + return 0; +} + +int tqOpenTGroup(STQ* pTq, const char* topic, int cgId) { + int code; + tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId); + if(handle == NULL) { + code = tqCreateTCGroup(pTq, topic, cgId, &handle); + if(code != 0) { + return code; + } + } + ASSERT(handle != NULL); + + //put into STQ + + return 0; +} + +int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) { + tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId); + return tqCommitTCGroup(handle); +} + +int tqDropTCGroup(STQ* pTq, const char* topic, int cgId) { + //delete from disk + return 0; +} + int tqPushMsg(STQ* pTq , void* p, int64_t version) { //add reference - // + //judge and launch new query return 0; } int tqCommit(STQ* pTq) { + //do nothing + return 0; +} + +int tqHandleMsg(STQ* pTq, void*msg) { + //parse msg and extract topic and cgId + //lookup handle + //confirm message and send to consumer + //judge and launch new query return 0; } -- GitLab