diff --git a/include/client/consumer/consumer.h b/include/client/consumer/consumer.h index b957f1a0257f1dd4d2fbcc911692c05b30fa7e5c..742e2c12fe08175f8dc0d05cce9b83976e01673a 100644 --- a/include/client/consumer/consumer.h +++ b/include/client/consumer/consumer.h @@ -20,8 +20,55 @@ extern "C" { #endif + //consumer handle + struct tmq_consumer_t; + typedef struct tmq_consumer_t tmq_consumer_t; + + //consumer config + struct tmq_consumer_config_t; + typedef struct tmq_consumer_config_t tmq_consumer_config_t; + + //response err + struct tmq_resp_err_t; + typedef struct tmq_resp_err_t tmq_resp_err_t; + + //topic list + //resouces are supposed to be free by users by calling tmq_list_destroy + struct tmq_topic_list_t; + typedef struct tmq_topic_list_t tmq_topic_list_t; + int32_t tmq_topic_list_add(tmq_topic_list_t*, const char*); + void tmq_topic_list_destroy(tmq_topic_list_t*); + + //get content of message + tmq_col_batch_t *tmq_get_msg_col_by_idx(tmq_message_t*, int32_t); + tmq_col_batch_t *tmq_get_msg_col_by_name(tmq_message_t*, const char*); + + //consumer config + int32_t tmq_conf_set(tmq_consumer_config_t* , const char* config_key, const char* config_value, char* errstr, int32_t errstr_cap); + + //consumer initialization + //resouces are supposed to be free by users by calling tmq_consumer_destroy + tmq_consumer_t* tmq_consumer_new(tmq_consumer_config_t* , char* errstr, int32_t errstr_cap); + + //subscribe + tmq_resp_err_t tmq_subscribe(tmq_consumer_t*, const tmq_topic_list_t*); + + //consume + //resouces are supposed to be free by users by calling tmq_message_destroy + tmq_message_t tmq_consume_poll(tmq_consumer_t*, int64_t blocking_time); + + //destroy message and free memory + void tmq_message_destroy(tmq_message_t*); + + //close consumer + int32_t tmq_consumer_close(tmq_consumer_t*); + + //destroy consumer + void tmq_consumer_destroy(tmq_message_t*); + + #ifdef __cplusplus } #endif -#endif /*_TD_CONSUMER_H_*/ \ No newline at end of file +#endif /*_TD_CONSUMER_H_*/ diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 498c33b6e6cfd07dddbad972be5d761b8f7278ea..8f6d09f8d6251edd9a5398ab1752f366a88e2849 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -12,7 +12,6 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ - #ifndef _TD_WAL_H_ #define _TD_WAL_H_ @@ -20,8 +19,59 @@ extern "C" { #endif +typedef enum { + TAOS_WAL_NOLOG = 0, + TAOS_WAL_WRITE = 1 +} EWalType; + +typedef struct { + int8_t msgType; + int8_t sver; // sver 2 for WAL SDataRow/SMemRow compatibility + int8_t reserved[2]; + int32_t len; + int64_t version; + uint32_t signature; + uint32_t cksum; + char cont[]; +} SWalHead; + +typedef struct { + int32_t vgId; + int32_t fsyncPeriod; // millisecond + EWalType walLevel; // wal level +} SWalCfg; + +typedef void * twalh; // WAL HANDLE +typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg); + +//module initialization +int32_t walInit(); +void walCleanUp(); + +//handle open and ctl +twalh walOpen(char *path, SWalCfg *pCfg); +int32_t walAlter(twalh, SWalCfg *pCfg); +void walStop(twalh); +void walClose(twalh); + +//write +int32_t walWrite(twalh, int8_t msgType, void* body, uint32_t bodyLen); +void walWaitFsync(twalh, bool forceHint); +//int32_t walCommit(twalh, uint64_t ver); +//int32_t walRollback(twalh, uint64_t ver); + +//read +int32_t walRead(twalh, SWalHead **, int64_t ver); +int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum); + +//life cycle +int32_t walDataPersisted(twalh, int64_t ver); +int32_t walFirstVer(twalh); +int32_t walLastVer(twal); +//int32_t walDataCorrupted(twalh); + #ifdef __cplusplus } #endif -#endif /*_TD_WAL_H_*/ \ No newline at end of file +#endif // _TD_WAL_H_