diff --git a/CMakeLists.txt b/CMakeLists.txt index 95e1cad2fe93a9a0e78f4e3cb83dac361d043fd7..efb92b114f4e87229d3e9957efda02379f647c9c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -54,4 +54,4 @@ add_library(api INTERFACE ${API_SRC}) # src add_subdirectory(source) -# tests (TODO) \ No newline at end of file +# tests (TODO) diff --git a/include/client/consumer/consumer.h b/include/client/consumer/consumer.h index 742e2c12fe08175f8dc0d05cce9b83976e01673a..8d1c9835e643d6f2fdddf32e5aef23ef1f0fcb44 100644 --- a/include/client/consumer/consumer.h +++ b/include/client/consumer/consumer.h @@ -16,6 +16,10 @@ #ifndef _TD_CONSUMER_H_ #define _TD_CONSUMER_H_ +#include "tlist.h" +#include "tarray.h" +#include "hash.h" + #ifdef __cplusplus extern "C" { #endif @@ -32,16 +36,15 @@ extern "C" { struct tmq_resp_err_t; typedef struct tmq_resp_err_t tmq_resp_err_t; - //topic list - //resouces are supposed to be free by users by calling tmq_list_destroy - struct tmq_topic_list_t; - typedef struct tmq_topic_list_t tmq_topic_list_t; - int32_t tmq_topic_list_add(tmq_topic_list_t*, const char*); - void tmq_topic_list_destroy(tmq_topic_list_t*); + struct tmq_message_t; + typedef struct tmq_message_t tmq_message_t; + + struct tmq_col_batch_t; + typedef struct tmq_col_batch_t tmq_col_batch_t; //get content of message - tmq_col_batch_t *tmq_get_msg_col_by_idx(tmq_message_t*, int32_t); - tmq_col_batch_t *tmq_get_msg_col_by_name(tmq_message_t*, const char*); + tmq_col_batch_t* tmq_get_msg_col_by_idx(tmq_message_t*, int32_t col_id); + tmq_col_batch_t* tmq_get_msg_col_by_name(tmq_message_t*, const char*); //consumer config int32_t tmq_conf_set(tmq_consumer_config_t* , const char* config_key, const char* config_value, char* errstr, int32_t errstr_cap); @@ -51,11 +54,12 @@ extern "C" { tmq_consumer_t* tmq_consumer_new(tmq_consumer_config_t* , char* errstr, int32_t errstr_cap); //subscribe - tmq_resp_err_t tmq_subscribe(tmq_consumer_t*, const tmq_topic_list_t*); + tmq_resp_err_t tmq_subscribe(tmq_consumer_t*, const SList*); + tmq_resp_err_t tmq_unsubscribe(tmq_consumer_t*); //consume //resouces are supposed to be free by users by calling tmq_message_destroy - tmq_message_t tmq_consume_poll(tmq_consumer_t*, int64_t blocking_time); + tmq_message_t* tmq_consume_poll(tmq_consumer_t*, int64_t blocking_time); //destroy message and free memory void tmq_message_destroy(tmq_message_t*); diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index 4c626a1e25af0ddcef4018766a06ff594b013740..3c163f50451edb7de026789a8c92300866398cc0 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -16,17 +16,33 @@ #ifndef _TD_TQ_H_ #define _TD_TQ_H_ +#include "os.h" + #ifdef __cplusplus extern "C" { #endif typedef struct STQ STQ; -int tqPushMsg(void *); +STQ* tqInit(); +void tqCleanUp(STQ* pTQ); + +//create persistent storage for meta info such as consuming offset +//return value > 0: cgId +//return value < 0: error code +int tqCreateGroup(STQ *pTQ); +//create ring buffer in memory and load consuming offset +int tqOpenGroup(STQ* pTQ, int cgId); +//destroy ring buffer and persist consuming offset +int tqCloseGroup(STQ *pTQ, int cgId); +//delete persistent storage for meta info +int tqDropGroup(STQ *pTQ); + +int tqPushMsg(STQ *pTQ, void *, int64_t version); int tqCommit(STQ *pTQ); #ifdef __cplusplus } #endif -#endif /*_TD_TQ_H_*/ \ No newline at end of file +#endif /*_TD_TQ_H_*/ diff --git a/source/client/consumer/consumer.c b/source/client/consumer/consumer.c index 6dea4a4e57392be988126c579648f39a8270b9bf..4ba1f9514444f8972bcb4c72cdb8a3c3c681eeec 100644 --- a/source/client/consumer/consumer.c +++ b/source/client/consumer/consumer.c @@ -11,4 +11,6 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ + +#include "consumer.h" diff --git a/source/server/vnode/tq/CMakeLists.txt b/source/server/vnode/tq/CMakeLists.txt index f4102b8d4b3e598c2afadbbc2476d487530f8fb2..7e80da75e9e5c0824f587e04fabe5899aae9bbe5 100644 --- a/source/server/vnode/tq/CMakeLists.txt +++ b/source/server/vnode/tq/CMakeLists.txt @@ -3,5 +3,10 @@ add_library(tq ${TQ_SRC}) target_include_directories( tq PUBLIC "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" + PRIVATE "${CMAKE_SOURCE_DIR}/include/os" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" -) \ No newline at end of file +) + +target_link_libraries( + os +) diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index 9a4b9731065270b66f2b75d3a372874b941eaa55..435a1150b41c036009aae6b0088e14975fa54e5f 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -20,8 +20,11 @@ extern "C" { #endif +//implement the array index +//implement the ring buffer + #ifdef __cplusplus } #endif -#endif /*_TD_TQ_INT_H_*/ \ No newline at end of file +#endif /*_TD_TQ_INT_H_*/ diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index a5391821155bd260b73300117d5545d987168220..f88c203fc984e3d36fc7300dcecb3e0370a91b61 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -15,5 +15,12 @@ #include "tq.h" -int tqPushMsg(void * p) {return 0;} -int tqCommit(STQ *pTQ) {return 0;} +int tqPushMsg(STQ *pTQ, void * p, int64_t version) { + //add reference + // + return 0; +} + +int tqCommit(STQ *pTQ) { + return 0; +}