From 0f3db7d7c2c721ac2a10059033decbf703963d95 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 8 Oct 2021 11:37:54 +0800 Subject: [PATCH] add tq header --- CMakeLists.txt | 2 +- include/client/consumer/consumer.h | 24 ++++++++++++++---------- include/server/vnode/tq/tq.h | 20 ++++++++++++++++++-- source/client/consumer/consumer.c | 4 +++- source/server/vnode/tq/CMakeLists.txt | 7 ++++++- source/server/vnode/tq/inc/tqInt.h | 5 ++++- source/server/vnode/tq/src/tq.c | 11 +++++++++-- 7 files changed, 55 insertions(+), 18 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 95e1cad2fe..efb92b114f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -54,4 +54,4 @@ add_library(api INTERFACE ${API_SRC}) # src add_subdirectory(source) -# tests (TODO) \ No newline at end of file +# tests (TODO) diff --git a/include/client/consumer/consumer.h b/include/client/consumer/consumer.h index 742e2c12fe..8d1c9835e6 100644 --- a/include/client/consumer/consumer.h +++ b/include/client/consumer/consumer.h @@ -16,6 +16,10 @@ #ifndef _TD_CONSUMER_H_ #define _TD_CONSUMER_H_ +#include "tlist.h" +#include "tarray.h" +#include "hash.h" + #ifdef __cplusplus extern "C" { #endif @@ -32,16 +36,15 @@ extern "C" { struct tmq_resp_err_t; typedef struct tmq_resp_err_t tmq_resp_err_t; - //topic list - //resouces are supposed to be free by users by calling tmq_list_destroy - struct tmq_topic_list_t; - typedef struct tmq_topic_list_t tmq_topic_list_t; - int32_t tmq_topic_list_add(tmq_topic_list_t*, const char*); - void tmq_topic_list_destroy(tmq_topic_list_t*); + struct tmq_message_t; + typedef struct tmq_message_t tmq_message_t; + + struct tmq_col_batch_t; + typedef struct tmq_col_batch_t tmq_col_batch_t; //get content of message - tmq_col_batch_t *tmq_get_msg_col_by_idx(tmq_message_t*, int32_t); - tmq_col_batch_t *tmq_get_msg_col_by_name(tmq_message_t*, const char*); + tmq_col_batch_t* tmq_get_msg_col_by_idx(tmq_message_t*, int32_t col_id); + tmq_col_batch_t* tmq_get_msg_col_by_name(tmq_message_t*, const char*); //consumer config int32_t tmq_conf_set(tmq_consumer_config_t* , const char* config_key, const char* config_value, char* errstr, int32_t errstr_cap); @@ -51,11 +54,12 @@ extern "C" { tmq_consumer_t* tmq_consumer_new(tmq_consumer_config_t* , char* errstr, int32_t errstr_cap); //subscribe - tmq_resp_err_t tmq_subscribe(tmq_consumer_t*, const tmq_topic_list_t*); + tmq_resp_err_t tmq_subscribe(tmq_consumer_t*, const SList*); + tmq_resp_err_t tmq_unsubscribe(tmq_consumer_t*); //consume //resouces are supposed to be free by users by calling tmq_message_destroy - tmq_message_t tmq_consume_poll(tmq_consumer_t*, int64_t blocking_time); + tmq_message_t* tmq_consume_poll(tmq_consumer_t*, int64_t blocking_time); //destroy message and free memory void tmq_message_destroy(tmq_message_t*); diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index 4c626a1e25..3c163f5045 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -16,17 +16,33 @@ #ifndef _TD_TQ_H_ #define _TD_TQ_H_ +#include "os.h" + #ifdef __cplusplus extern "C" { #endif typedef struct STQ STQ; -int tqPushMsg(void *); +STQ* tqInit(); +void tqCleanUp(STQ* pTQ); + +//create persistent storage for meta info such as consuming offset +//return value > 0: cgId +//return value < 0: error code +int tqCreateGroup(STQ *pTQ); +//create ring buffer in memory and load consuming offset +int tqOpenGroup(STQ* pTQ, int cgId); +//destroy ring buffer and persist consuming offset +int tqCloseGroup(STQ *pTQ, int cgId); +//delete persistent storage for meta info +int tqDropGroup(STQ *pTQ); + +int tqPushMsg(STQ *pTQ, void *, int64_t version); int tqCommit(STQ *pTQ); #ifdef __cplusplus } #endif -#endif /*_TD_TQ_H_*/ \ No newline at end of file +#endif /*_TD_TQ_H_*/ diff --git a/source/client/consumer/consumer.c b/source/client/consumer/consumer.c index 6dea4a4e57..4ba1f95144 100644 --- a/source/client/consumer/consumer.c +++ b/source/client/consumer/consumer.c @@ -11,4 +11,6 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ + +#include "consumer.h" diff --git a/source/server/vnode/tq/CMakeLists.txt b/source/server/vnode/tq/CMakeLists.txt index f4102b8d4b..7e80da75e9 100644 --- a/source/server/vnode/tq/CMakeLists.txt +++ b/source/server/vnode/tq/CMakeLists.txt @@ -3,5 +3,10 @@ add_library(tq ${TQ_SRC}) target_include_directories( tq PUBLIC "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" + PRIVATE "${CMAKE_SOURCE_DIR}/include/os" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" -) \ No newline at end of file +) + +target_link_libraries( + os +) diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index 9a4b973106..435a1150b4 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -20,8 +20,11 @@ extern "C" { #endif +//implement the array index +//implement the ring buffer + #ifdef __cplusplus } #endif -#endif /*_TD_TQ_INT_H_*/ \ No newline at end of file +#endif /*_TD_TQ_INT_H_*/ diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index a539182115..f88c203fc9 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -15,5 +15,12 @@ #include "tq.h" -int tqPushMsg(void * p) {return 0;} -int tqCommit(STQ *pTQ) {return 0;} +int tqPushMsg(STQ *pTQ, void * p, int64_t version) { + //add reference + // + return 0; +} + +int tqCommit(STQ *pTQ) { + return 0; +} -- GitLab